tq.c 19.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20
// static
// read next version data
L
Liu Jicong 已提交
21
//
L
Liu Jicong 已提交
22
// send to fetch queue
L
Liu Jicong 已提交
23
//
L
Liu Jicong 已提交
24
// handle management message
L
Liu Jicong 已提交
25
//
L
Liu Jicong 已提交
26

L
Liu Jicong 已提交
27 28 29
int tqGroupSSize(const STqGroup* pGroup);
int tqTopicSSize();
int tqItemSSize();
L
Liu Jicong 已提交
30

L
Liu Jicong 已提交
31 32 33
void* tqSerializeListHandle(STqList* listHandle, void* ptr);
void* tqSerializeTopic(STqTopic* pTopic, void* ptr);
void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
  if(old == 1) return 0;

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
  if(old == 0) return;
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
54
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
55
  if (pTq == NULL) {
L
Liu Jicong 已提交
56
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
57 58
    return NULL;
  }
H
Hongze Cheng 已提交
59
  pTq->path = strdup(path);
L
Liu Jicong 已提交
60
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
61
  pTq->tqLogHandle = tqLogHandle;
H
more  
Hongze Cheng 已提交
62
#if 0
L
Liu Jicong 已提交
63
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
64 65
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
66
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
67
  }
H
more  
Hongze Cheng 已提交
68
#endif
L
Liu Jicong 已提交
69
  pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
L
Liu Jicong 已提交
70
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
71
    free(pTq);
L
Liu Jicong 已提交
72
#if 0
L
Liu Jicong 已提交
73
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
L
Liu Jicong 已提交
74
#endif
L
Liu Jicong 已提交
75 76
    return NULL;
  }
L
Liu Jicong 已提交
77

L
Liu Jicong 已提交
78 79
  return pTq;
}
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81 82 83
void tqClose(STQ* pTq) {
  // TODO
}
L
Liu Jicong 已提交
84

L
Liu Jicong 已提交
85 86 87 88
static int tqProtoCheck(STqMsgHead* pMsg) {
  // TODO
  return pMsg->protoVer == 0;
}
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
91
  // clean old item and move forward
L
Liu Jicong 已提交
92
  int32_t consumeOffset = pAck->consumeOffset;
L
Liu Jicong 已提交
93
  int     idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
94 95
  ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor);
  tfree(pTopic->buffer[idx].content);
L
Liu Jicong 已提交
96 97 98
  if (1 /* TODO: need to launch new query */) {
    STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
    if (pNewQuery == NULL) {
L
Liu Jicong 已提交
99
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
100 101
      return -1;
    }
L
Liu Jicong 已提交
102 103
    // TODO: lock executor
    // TODO: read from wal and assign to src
L
Liu Jicong 已提交
104 105 106 107 108
    /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
    /*pNewQuery->exec->src = 0;*/
    /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
    /*pNewQuery->next = *ppQuery;*/
    /**ppQuery = pNewQuery;*/
L
Liu Jicong 已提交
109
  }
L
Liu Jicong 已提交
110 111 112
  return 0;
}

L
Liu Jicong 已提交
113
static int tqAck(STqGroup* pGroup, STqAcks* pAcks) {
L
Liu Jicong 已提交
114
  int32_t    ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
115
  STqOneAck* acks = pAcks->acks;
L
Liu Jicong 已提交
116
  // double ptr for acks and list
L
Liu Jicong 已提交
117 118 119 120
  int          i = 0;
  STqList*     node = pGroup->head;
  int          ackCnt = 0;
  STqQueryMsg* pQuery = NULL;
L
Liu Jicong 已提交
121
  while (i < ackNum && node->next) {
L
Liu Jicong 已提交
122
    if (acks[i].topicId == node->next->topic.topicId) {
L
Liu Jicong 已提交
123
      ackCnt++;
L
Liu Jicong 已提交
124 125
      tqAckOneTopic(&node->next->topic, &acks[i], &pQuery);
    } else if (acks[i].topicId < node->next->topic.topicId) {
L
Liu Jicong 已提交
126 127 128
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
129 130
    }
  }
L
Liu Jicong 已提交
131 132
  if (pQuery) {
    // post message
L
Liu Jicong 已提交
133 134 135
  }
  return ackCnt;
}
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
static int tqCommitGroup(STqGroup* pGroup) {
L
Liu Jicong 已提交
138
  // persist modification into disk
L
Liu Jicong 已提交
139 140 141
  return 0;
}

L
Liu Jicong 已提交
142
int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) {
L
Liu Jicong 已提交
143
  // create in disk
L
Liu Jicong 已提交
144 145
  STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup));
  if (pGroup == NULL) {
L
Liu Jicong 已提交
146
    // TODO
L
Liu Jicong 已提交
147 148
    return -1;
  }
L
Liu Jicong 已提交
149 150
  *ppGroup = pGroup;
  memset(pGroup, 0, sizeof(STqGroup));
L
Liu Jicong 已提交
151

L
Liu Jicong 已提交
152 153 154 155 156 157 158
  pGroup->topicList = tdListNew(sizeof(STqTopic));
  if(pGroup->topicList == NULL) {
    free(pGroup);
    return -1;
  }
  *ppGroup = pGroup;

L
Liu Jicong 已提交
159 160 161
  return 0;
}

L
Liu Jicong 已提交
162 163 164 165 166
STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId);
  if (pGroup == NULL) {
    int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup);
    if (code < 0) {
L
Liu Jicong 已提交
167
      // TODO
L
Liu Jicong 已提交
168 169
      return NULL;
    }
L
Liu Jicong 已提交
170
    tqHandleMovePut(pTq->tqMeta, cId, pGroup);
L
Liu Jicong 已提交
171
  }
L
Liu Jicong 已提交
172
  ASSERT(pGroup);
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174
  return pGroup;
L
Liu Jicong 已提交
175
}
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177 178 179 180
int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  // TODO
  return 0;
}
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
183
  // delete from disk
L
Liu Jicong 已提交
184 185 186
  return 0;
}

L
Liu Jicong 已提交
187 188 189
static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
  STqList* pHead = pGroup->head;
  STqList* pNode = pHead;
L
Liu Jicong 已提交
190
  int      totSize = 0;
L
Liu Jicong 已提交
191
  int      numOfMsgs = 0;
L
Liu Jicong 已提交
192
  // TODO: make it a macro
L
Liu Jicong 已提交
193 194 195 196 197
  int   sizeLimit = 4 * 1024;

  void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
198 199
    return -1;
  }
L
Liu Jicong 已提交
200 201 202
  *pRsp = ptr;
  STqMsgContent* buffer = (*pRsp)->msgs;

L
Liu Jicong 已提交
203 204
  // iterate the list to get msgs of all topics
  // until all topic iterated or msgs over sizeLimit
L
Liu Jicong 已提交
205 206 207 208 209 210
  while (pNode->next) {
    pNode = pNode->next;
    STqTopic* pTopic = &pNode->topic;
    int       idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE;
    if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) {
      totSize += pTopic->buffer[idx].size;
L
Liu Jicong 已提交
211
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
212
        void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize);
L
Liu Jicong 已提交
213
        if (ptr == NULL) {
L
Liu Jicong 已提交
214 215
          totSize -= pTopic->buffer[idx].size;
          terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
216
          // return msgs already copied
L
Liu Jicong 已提交
217 218
          break;
        }
L
Liu Jicong 已提交
219 220
        *pRsp = ptr;
        break;
L
Liu Jicong 已提交
221
      }
L
Liu Jicong 已提交
222
      *((int64_t*)buffer) = pTopic->topicId;
L
Liu Jicong 已提交
223
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
224
      *((int64_t*)buffer) = pTopic->buffer[idx].size;
L
Liu Jicong 已提交
225
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
226 227 228
      memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
      numOfMsgs++;
L
Liu Jicong 已提交
229
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
230 231 232 233
        break;
      }
    }
  }
L
Liu Jicong 已提交
234 235
  (*pRsp)->bodySize = totSize;
  return numOfMsgs;
L
Liu Jicong 已提交
236 237
}

L
Liu Jicong 已提交
238
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
L
Liu Jicong 已提交
239

L
Liu Jicong 已提交
240 241 242 243 244 245 246 247 248 249 250
int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) {
  if (tqQueryExecuting(bufItem->status)) {
    return 0;
  }
  bufItem->status = 1;
  // load data from wal or buffer pool
  // put into exec
  // send exec into non blocking queue
  // when query finished, put into buffer pool
  return 0;
}
L
Liu Jicong 已提交
251

L
Liu Jicong 已提交
252
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
253
/*return 0;*/
L
Liu Jicong 已提交
254 255
/*}*/

L
Liu Jicong 已提交
256 257 258
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
259 260 261
  return 0;
}

L
Liu Jicong 已提交
262
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
263
  // do nothing
L
Liu Jicong 已提交
264 265 266
  return 0;
}

L
Liu Jicong 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) {
  int code;
  memset(pTopic->buffer, 0, sizeof(pTopic->buffer));
  // launch query
  for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) {
    int pos = i % TQ_BUFFER_SIZE;
    code = tqSendLaunchQuery(&pTopic->buffer[pos], offset);
    if (code < 0) {
      // TODO: error handling
    }
  }
  // set offset
  pTopic->nextConsumeOffset = offset;
  pTopic->floatingCursor = offset;
  return 0;
}

STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) {
  // TODO
  return NULL;
}

int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
  int       code;
  int64_t   clientId = pMsg->head.clientId;
  int64_t   topicId = pMsg->topicId;
  int64_t   offset = pMsg->offset;
  STqGroup* gHandle = tqGetGroup(pTq, clientId);
  if (gHandle == NULL) {
    // client not connect
    return -1;
  }
  STqTopic* topicHandle = tqFindTopic(gHandle, topicId);
  if (topicHandle == NULL) {
    return -1;
  }
  if (pMsg->offset == topicHandle->nextConsumeOffset) {
    return 0;
  }
  // TODO: check log last version

  code = tqBufferSetOffset(topicHandle, offset);
  if (code < 0) {
    // set error code
    return -1;
  }

L
Liu Jicong 已提交
314 315 316
  return 0;
}

L
Liu Jicong 已提交
317 318
// temporary
int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
L
Liu Jicong 已提交
319 320 321 322 323 324
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }
L
Liu Jicong 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
  pGroup->rspHandle.handle = pRsp->handle;
  pGroup->rspHandle.ahandle = pRsp->ahandle;

  return 0;
}

int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
  STqConsumeReq *pMsg = pReq->pCont;
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }

  SList* topicList = pGroup->topicList;

  int totSize = 0;
  int numOfMsgs = 0;
  int sizeLimit = 4096;


  STqConsumeRsp *pCsmRsp = (*pRsp)->pCont;
  void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  (*pRsp)->pCont = ptr;

  SListIter iter;
  tdListInitIter(topicList, &iter, TD_LIST_FORWARD);

  STqMsgContent* buffer = NULL;
  SArray* pArray = taosArrayInit(0, sizeof(void*));

  SListNode *pn;
  while((pn = tdListNext(&iter)) != NULL) {
    STqTopic* pTopic = *(STqTopic**)pn->data;
    int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
    STqMsgItem* pItem = &pTopic->buffer[idx];
    if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
      if(pItem->status == TQ_ITEM_READY) {
        //if has data
        totSize += pTopic->buffer[idx].size;
        if (totSize > sizeLimit) {
          void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
          if (ptr == NULL) {
            totSize -= pTopic->buffer[idx].size;
            terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
            // return msgs already copied
            break;
          }
          (*pRsp)->pCont = ptr;
          break;
        }
L
Liu Jicong 已提交
381
        *((int64_t*)buffer) = htobe64(pTopic->topicId);
L
Liu Jicong 已提交
382
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
383
        *((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
L
Liu Jicong 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
        memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
        buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
        numOfMsgs++;
        if (totSize > sizeLimit) {
          break;
        }
      } else if(pItem->status == TQ_ITEM_PROCESS) {
        //if not have data but in process

      } else if(pItem->status == TQ_ITEM_EMPTY){
        //if not have data and not in process
        int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
        if(old != TQ_ITEM_EMPTY) {
          continue;
        }
        pItem->offset = pTopic->floatingCursor;
        taosArrayPush(pArray, &pItem);
      } else {
        ASSERT(0);
      }
    }
  }

L
Liu Jicong 已提交
408 409 410 411 412 413 414 415 416 417 418
  if (numOfMsgs > 0) {
    // set code and other msg
    rpcSendResponse(*pRsp);
  } else {
    // most recent data has been fetched

    // enable timer for blocking wait
    // once new data written when waiting, launch query and rsp
  }

  // fetched a num of msgs, rpc response
L
Liu Jicong 已提交
419 420 421 422
  for(int i = 0; i < pArray->size; i++) {
    STqMsgItem* pItem = taosArrayGet(pArray, i);

    //read from wal
L
Liu Jicong 已提交
423 424 425 426 427 428
    void* raw = NULL;
    /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
    int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
    if(code < 0) {
      //TODO: error
    }
L
Liu Jicong 已提交
429 430 431 432 433 434
    //get msgType
    //if submitblk
    pItem->executor->assign(pItem->executor->runtimeEnv, raw);
    SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
    pItem->content = content;
    //if other type, send just put into buffer
L
Liu Jicong 已提交
435
    /*pItem->content = raw;*/
L
Liu Jicong 已提交
436 437 438 439

    int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
    ASSERT(old == TQ_ITEM_PROCESS);
  }
L
Liu Jicong 已提交
440
  taosArrayDestroy(pArray);
L
Liu Jicong 已提交
441 442 443 444 445

  return 0;
}

#if 0
L
Liu Jicong 已提交
446
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
L
Liu Jicong 已提交
447
  if (!tqProtoCheck((STqMsgHead*)pMsg)) {
L
Liu Jicong 已提交
448
    // proto version invalid
L
Liu Jicong 已提交
449 450
    return -1;
  }
L
Liu Jicong 已提交
451 452 453
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
L
Liu Jicong 已提交
454
    // client not connect
L
Liu Jicong 已提交
455 456
    return -1;
  }
L
Liu Jicong 已提交
457
  if (pMsg->acks.ackNum != 0) {
L
Liu Jicong 已提交
458
    if (tqAck(pGroup, &pMsg->acks) != 0) {
L
Liu Jicong 已提交
459
      // ack not success
L
Liu Jicong 已提交
460 461 462 463
      return -1;
    }
  }

L
Liu Jicong 已提交
464
  STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
L
Liu Jicong 已提交
465

L
Liu Jicong 已提交
466
  if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
467
    // fetch error
L
Liu Jicong 已提交
468 469 470
    return -1;
  }

L
Liu Jicong 已提交
471
  // judge and launch new query
L
Liu Jicong 已提交
472 473 474 475
  /*if (tqSendLaunchQuery(gHandle)) {*/
  // launch query error
  /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
476 477
  return 0;
}
L
Liu Jicong 已提交
478
#endif
L
Liu Jicong 已提交
479

L
Liu Jicong 已提交
480
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
L
Liu Jicong 已提交
481
  // calculate size
L
Liu Jicong 已提交
482
  int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
L
Liu Jicong 已提交
483
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
484
    void* tmpPtr = realloc(*ppHead, sz);
L
Liu Jicong 已提交
485
    if (tmpPtr == NULL) {
L
Liu Jicong 已提交
486
      free(*ppHead);
L
Liu Jicong 已提交
487
      // TODO: memory err
L
Liu Jicong 已提交
488 489 490 491
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
492
  }
L
Liu Jicong 已提交
493
  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
494
  // do serialization
L
Liu Jicong 已提交
495
  *(int64_t*)ptr = pGroup->clientId;
L
Liu Jicong 已提交
496
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
497
  *(int64_t*)ptr = pGroup->cgId;
L
Liu Jicong 已提交
498
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
499
  *(int32_t*)ptr = pGroup->topicNum;
500
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
501 502
  if (pGroup->topicNum > 0) {
    tqSerializeListHandle(pGroup->head, ptr);
L
Liu Jicong 已提交
503 504 505 506
  }
  return 0;
}

L
Liu Jicong 已提交
507 508
void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
  STqList* node = listHandle;
509
  ASSERT(node != NULL);
L
Liu Jicong 已提交
510
  while (node) {
L
Liu Jicong 已提交
511
    ptr = tqSerializeTopic(&node->topic, ptr);
L
Liu Jicong 已提交
512 513
    node = node->next;
  }
514
  return ptr;
L
Liu Jicong 已提交
515
}
516

L
Liu Jicong 已提交
517 518
void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
  *(int64_t*)ptr = pTopic->nextConsumeOffset;
L
Liu Jicong 已提交
519
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
520
  *(int64_t*)ptr = pTopic->topicId;
L
Liu Jicong 已提交
521
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
522 523 524 525
  /**(int32_t*)ptr = pTopic->head;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /**(int32_t*)ptr = pTopic->tail;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
526
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
527
    ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
L
Liu Jicong 已提交
528
  }
529
  return ptr;
L
Liu Jicong 已提交
530 531
}

L
Liu Jicong 已提交
532
void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
L
Liu Jicong 已提交
533 534
  // TODO: do we need serialize this?
  // mainly for executor
535
  return ptr;
L
Liu Jicong 已提交
536 537
}

L
Liu Jicong 已提交
538 539 540
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
  STqGroup*   gHandle = *ppGroup;
  const void* ptr = pHead->content;
L
Liu Jicong 已提交
541
  gHandle->clientId = *(int64_t*)ptr;
542 543 544 545 546 547 548
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
549
  STqList* node = gHandle->head;
L
Liu Jicong 已提交
550 551
  for (int i = 0; i < gHandle->topicNum; i++) {
    if (gHandle->head == NULL) {
L
Liu Jicong 已提交
552
      if ((node = malloc(sizeof(STqList))) == NULL) {
L
Liu Jicong 已提交
553
        // TODO: error
554 555
        return NULL;
      }
L
Liu Jicong 已提交
556
      node->next = NULL;
L
Liu Jicong 已提交
557
      ptr = tqDeserializeTopic(ptr, &node->topic);
558 559
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
560
      node->next = malloc(sizeof(STqList));
L
Liu Jicong 已提交
561 562
      if (node->next == NULL) {
        // TODO: error
563 564 565
        return NULL;
      }
      node->next->next = NULL;
L
Liu Jicong 已提交
566
      ptr = tqDeserializeTopic(ptr, &node->next->topic);
567 568 569 570
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
571
}
572

L
Liu Jicong 已提交
573
const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
574
  const void* ptr = pBytes;
L
Liu Jicong 已提交
575
  topic->nextConsumeOffset = *(int64_t*)ptr;
576
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
577
  topic->topicId = *(int64_t*)ptr;
578
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
579 580 581 582
  /*topic->head = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /*topic->tail = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
583
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
584
    ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
585 586
  }
  return ptr;
L
Liu Jicong 已提交
587 588
}

L
Liu Jicong 已提交
589
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
// TODO: make this a macro
L
Liu Jicong 已提交
592
int tqGroupSSize(const STqGroup* gHandle) {
L
Liu Jicong 已提交
593 594
  return sizeof(int64_t) * 2  // cId + cgId
         + sizeof(int32_t)    // topicNum
L
Liu Jicong 已提交
595
         + gHandle->topicNum * tqTopicSSize();
L
Liu Jicong 已提交
596
}
597

L
Liu Jicong 已提交
598
// TODO: make this a macro
L
Liu Jicong 已提交
599
int tqTopicSSize() {
L
Liu Jicong 已提交
600 601
  return sizeof(int64_t) * 2    // nextConsumeOffset + topicId
         + sizeof(int32_t) * 2  // head + tail
L
Liu Jicong 已提交
602
         + TQ_BUFFER_SIZE * tqItemSSize();
L
Liu Jicong 已提交
603
}
604

L
Liu Jicong 已提交
605
int tqItemSSize() {
L
Liu Jicong 已提交
606 607
  // TODO: do this need serialization?
  // mainly for executor
L
Liu Jicong 已提交
608 609
  return 0;
}
610

L
Liu Jicong 已提交
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) {
  SMqCVConsumeReq* pReq = pMsg->pCont;
  int64_t reqId = pReq->reqId;
  int64_t clientId = pReq->clientId;
  int64_t offset = pReq->offset;
  int64_t blockingTime = pReq->blockingTime;

  STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId);
  int8_t pos = offset % TQ_BUFFER_SIZE;
  int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
  if (old == 1) {
    // do nothing
  }
  if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
    //TODO
  }
  SWalHead *pHead = pHandle->pReadhandle->pHead;
  while (pHead->head.msgType != TDMT_VND_SUBMIT) {
    // read until find TDMT_VND_SUBMIT
  }
  SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body;

  SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;

  void* outputData;

  atomic_store_8(&pHandle->buffer.output[pos].status, 1);
  // launch query
  // get result
  // put into 
  SMqCvConsumeRsp* pRsp;
  return 0;
}

645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
  STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
  if (pReadHandle == NULL) {
    return NULL;
  }
  pReadHandle->pMeta = pMeta;
  pReadHandle->pMsg = pMsg;
  tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
  pReadHandle->ver = -1;
  return NULL;
}

bool tqNextDataBlock(STqReadHandle* pHandle) {
  if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
    return false;
  }
  return true;
}

int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
  SMemRow row;
  int32_t sversion = pHandle->pBlock->sversion;
  SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
  pBlockInfo->numOfCols = pSchema->nCols;
  pBlockInfo->rows = pHandle->pBlock->numOfRows;
  pBlockInfo->uid = pHandle->pBlock->uid;
L
Liu Jicong 已提交
671
  //TODO: filter out unused column
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
  return 0;
}
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
  int32_t sversion = pHandle->pBlock->sversion;
  SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
  STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
  SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
  if (pArray == NULL) {
    return NULL;
  }
  SColumnInfoData colInfo;
  int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
  colInfo.pData = malloc(sz);
  if (colInfo.pData == NULL) {
    return NULL;
  }

  for (int i = 0; i < pTschema->numOfCols; i++) {
L
Liu Jicong 已提交
690
    //TODO: filter out unused column
691 692 693 694 695 696 697
    taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
  }

  SMemRow row;
  int32_t kvIdx;
  while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
    for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
L
Liu Jicong 已提交
698
    //TODO: filter out unused column
699 700 701 702 703 704 705 706 707 708 709 710
      STColumn *pCol = schemaColAt(pTschema, i);
      void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
      //TODO: handle varlen
      memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
    }
  }
  taosArrayPush(pArray, &colInfo);
  return pArray;
}
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
  /*return 0;*/
/*}*/