tq.c 20.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20
// static
// read next version data
L
Liu Jicong 已提交
21
//
L
Liu Jicong 已提交
22
// send to fetch queue
L
Liu Jicong 已提交
23
//
L
Liu Jicong 已提交
24
// handle management message
L
Liu Jicong 已提交
25
//
L
Liu Jicong 已提交
26

L
Liu Jicong 已提交
27 28 29
int tqGroupSSize(const STqGroup* pGroup);
int tqTopicSSize();
int tqItemSSize();
L
Liu Jicong 已提交
30

L
Liu Jicong 已提交
31 32 33
void* tqSerializeListHandle(STqList* listHandle, void* ptr);
void* tqSerializeTopic(STqTopic* pTopic, void* ptr);
void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
L
Liu Jicong 已提交
40
  if (old == 1) return 0;
L
Liu Jicong 已提交
41 42 43 44 45 46 47

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
L
Liu Jicong 已提交
48
  if (old == 0) return;
L
Liu Jicong 已提交
49 50 51 52 53
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
54
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
55
  if (pTq == NULL) {
L
Liu Jicong 已提交
56
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
57 58
    return NULL;
  }
H
Hongze Cheng 已提交
59
  pTq->path = strdup(path);
L
Liu Jicong 已提交
60
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
61
  pTq->tqLogHandle = tqLogHandle;
H
more  
Hongze Cheng 已提交
62
#if 0
L
Liu Jicong 已提交
63
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
64 65
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
66
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
67
  }
H
more  
Hongze Cheng 已提交
68
#endif
L
Liu Jicong 已提交
69
  pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
L
Liu Jicong 已提交
70
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
71
    free(pTq);
L
Liu Jicong 已提交
72
#if 0
L
Liu Jicong 已提交
73
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
L
Liu Jicong 已提交
74
#endif
L
Liu Jicong 已提交
75 76
    return NULL;
  }
L
Liu Jicong 已提交
77

L
Liu Jicong 已提交
78 79
  return pTq;
}
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81 82 83
void tqClose(STQ* pTq) {
  // TODO
}
L
Liu Jicong 已提交
84

L
Liu Jicong 已提交
85 86 87 88
static int tqProtoCheck(STqMsgHead* pMsg) {
  // TODO
  return pMsg->protoVer == 0;
}
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
91
  // clean old item and move forward
L
Liu Jicong 已提交
92
  int32_t consumeOffset = pAck->consumeOffset;
L
Liu Jicong 已提交
93
  int     idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
94 95
  ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor);
  tfree(pTopic->buffer[idx].content);
L
Liu Jicong 已提交
96 97 98
  if (1 /* TODO: need to launch new query */) {
    STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
    if (pNewQuery == NULL) {
L
Liu Jicong 已提交
99
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
100 101
      return -1;
    }
L
Liu Jicong 已提交
102 103
    // TODO: lock executor
    // TODO: read from wal and assign to src
L
Liu Jicong 已提交
104 105 106 107 108
    /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
    /*pNewQuery->exec->src = 0;*/
    /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
    /*pNewQuery->next = *ppQuery;*/
    /**ppQuery = pNewQuery;*/
L
Liu Jicong 已提交
109
  }
L
Liu Jicong 已提交
110 111 112
  return 0;
}

L
Liu Jicong 已提交
113
static int tqAck(STqGroup* pGroup, STqAcks* pAcks) {
L
Liu Jicong 已提交
114
  int32_t    ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
115
  STqOneAck* acks = pAcks->acks;
L
Liu Jicong 已提交
116
  // double ptr for acks and list
L
Liu Jicong 已提交
117 118 119 120
  int          i = 0;
  STqList*     node = pGroup->head;
  int          ackCnt = 0;
  STqQueryMsg* pQuery = NULL;
L
Liu Jicong 已提交
121
  while (i < ackNum && node->next) {
L
Liu Jicong 已提交
122
    if (acks[i].topicId == node->next->topic.topicId) {
L
Liu Jicong 已提交
123
      ackCnt++;
L
Liu Jicong 已提交
124 125
      tqAckOneTopic(&node->next->topic, &acks[i], &pQuery);
    } else if (acks[i].topicId < node->next->topic.topicId) {
L
Liu Jicong 已提交
126 127 128
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
129 130
    }
  }
L
Liu Jicong 已提交
131 132
  if (pQuery) {
    // post message
L
Liu Jicong 已提交
133 134 135
  }
  return ackCnt;
}
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
static int tqCommitGroup(STqGroup* pGroup) {
L
Liu Jicong 已提交
138
  // persist modification into disk
L
Liu Jicong 已提交
139 140 141
  return 0;
}

L
Liu Jicong 已提交
142
int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) {
L
Liu Jicong 已提交
143
  // create in disk
L
Liu Jicong 已提交
144 145
  STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup));
  if (pGroup == NULL) {
L
Liu Jicong 已提交
146
    // TODO
L
Liu Jicong 已提交
147 148
    return -1;
  }
L
Liu Jicong 已提交
149 150
  *ppGroup = pGroup;
  memset(pGroup, 0, sizeof(STqGroup));
L
Liu Jicong 已提交
151

L
Liu Jicong 已提交
152
  pGroup->topicList = tdListNew(sizeof(STqTopic));
L
Liu Jicong 已提交
153
  if (pGroup->topicList == NULL) {
L
Liu Jicong 已提交
154 155 156 157 158
    free(pGroup);
    return -1;
  }
  *ppGroup = pGroup;

L
Liu Jicong 已提交
159 160 161
  return 0;
}

L
Liu Jicong 已提交
162 163 164 165 166
STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId);
  if (pGroup == NULL) {
    int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup);
    if (code < 0) {
L
Liu Jicong 已提交
167
      // TODO
L
Liu Jicong 已提交
168 169
      return NULL;
    }
L
Liu Jicong 已提交
170
    tqHandleMovePut(pTq->tqMeta, cId, pGroup);
L
Liu Jicong 已提交
171
  }
L
Liu Jicong 已提交
172
  ASSERT(pGroup);
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174
  return pGroup;
L
Liu Jicong 已提交
175
}
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177 178 179 180
int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  // TODO
  return 0;
}
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
183
  // delete from disk
L
Liu Jicong 已提交
184 185 186
  return 0;
}

L
Liu Jicong 已提交
187 188 189
static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
  STqList* pHead = pGroup->head;
  STqList* pNode = pHead;
L
Liu Jicong 已提交
190
  int      totSize = 0;
L
Liu Jicong 已提交
191
  int      numOfMsgs = 0;
L
Liu Jicong 已提交
192
  // TODO: make it a macro
L
Liu Jicong 已提交
193
  int sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
194 195 196 197

  void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
198 199
    return -1;
  }
L
Liu Jicong 已提交
200 201 202
  *pRsp = ptr;
  STqMsgContent* buffer = (*pRsp)->msgs;

L
Liu Jicong 已提交
203 204
  // iterate the list to get msgs of all topics
  // until all topic iterated or msgs over sizeLimit
L
Liu Jicong 已提交
205 206 207 208 209 210
  while (pNode->next) {
    pNode = pNode->next;
    STqTopic* pTopic = &pNode->topic;
    int       idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE;
    if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) {
      totSize += pTopic->buffer[idx].size;
L
Liu Jicong 已提交
211
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
212
        void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize);
L
Liu Jicong 已提交
213
        if (ptr == NULL) {
L
Liu Jicong 已提交
214 215
          totSize -= pTopic->buffer[idx].size;
          terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
216
          // return msgs already copied
L
Liu Jicong 已提交
217 218
          break;
        }
L
Liu Jicong 已提交
219 220
        *pRsp = ptr;
        break;
L
Liu Jicong 已提交
221
      }
L
Liu Jicong 已提交
222
      *((int64_t*)buffer) = pTopic->topicId;
L
Liu Jicong 已提交
223
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
224
      *((int64_t*)buffer) = pTopic->buffer[idx].size;
L
Liu Jicong 已提交
225
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
226 227 228
      memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
      numOfMsgs++;
L
Liu Jicong 已提交
229
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
230 231 232 233
        break;
      }
    }
  }
L
Liu Jicong 已提交
234 235
  (*pRsp)->bodySize = totSize;
  return numOfMsgs;
L
Liu Jicong 已提交
236 237
}

L
Liu Jicong 已提交
238
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
L
Liu Jicong 已提交
239

L
Liu Jicong 已提交
240 241 242 243 244 245 246 247 248 249 250
int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) {
  if (tqQueryExecuting(bufItem->status)) {
    return 0;
  }
  bufItem->status = 1;
  // load data from wal or buffer pool
  // put into exec
  // send exec into non blocking queue
  // when query finished, put into buffer pool
  return 0;
}
L
Liu Jicong 已提交
251

L
Liu Jicong 已提交
252
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
253
/*return 0;*/
L
Liu Jicong 已提交
254 255
/*}*/

L
Liu Jicong 已提交
256 257 258
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
259 260 261
  return 0;
}

L
Liu Jicong 已提交
262
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
263
  // do nothing
L
Liu Jicong 已提交
264 265 266
  return 0;
}

L
Liu Jicong 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) {
  int code;
  memset(pTopic->buffer, 0, sizeof(pTopic->buffer));
  // launch query
  for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) {
    int pos = i % TQ_BUFFER_SIZE;
    code = tqSendLaunchQuery(&pTopic->buffer[pos], offset);
    if (code < 0) {
      // TODO: error handling
    }
  }
  // set offset
  pTopic->nextConsumeOffset = offset;
  pTopic->floatingCursor = offset;
  return 0;
}

STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) {
  // TODO
  return NULL;
}

int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
  int       code;
  int64_t   clientId = pMsg->head.clientId;
  int64_t   topicId = pMsg->topicId;
  int64_t   offset = pMsg->offset;
  STqGroup* gHandle = tqGetGroup(pTq, clientId);
  if (gHandle == NULL) {
    // client not connect
    return -1;
  }
  STqTopic* topicHandle = tqFindTopic(gHandle, topicId);
  if (topicHandle == NULL) {
    return -1;
  }
  if (pMsg->offset == topicHandle->nextConsumeOffset) {
    return 0;
  }
  // TODO: check log last version

  code = tqBufferSetOffset(topicHandle, offset);
  if (code < 0) {
    // set error code
    return -1;
  }

L
Liu Jicong 已提交
314 315 316
  return 0;
}

L
Liu Jicong 已提交
317 318
// temporary
int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
L
Liu Jicong 已提交
319 320 321 322 323 324
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }
L
Liu Jicong 已提交
325 326 327 328 329 330 331
  pGroup->rspHandle.handle = pRsp->handle;
  pGroup->rspHandle.ahandle = pRsp->ahandle;

  return 0;
}

int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
L
Liu Jicong 已提交
332 333 334
  STqConsumeReq* pMsg = pReq->pCont;
  int64_t        clientId = pMsg->head.clientId;
  STqGroup*      pGroup = tqGetGroup(pTq, clientId);
L
Liu Jicong 已提交
335 336 337 338 339 340 341 342 343 344 345
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }

  SList* topicList = pGroup->topicList;

  int totSize = 0;
  int numOfMsgs = 0;
  int sizeLimit = 4096;

L
Liu Jicong 已提交
346 347
  STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
  void*          ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
L
Liu Jicong 已提交
348 349 350 351 352 353 354 355 356 357
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  (*pRsp)->pCont = ptr;

  SListIter iter;
  tdListInitIter(topicList, &iter, TD_LIST_FORWARD);

  STqMsgContent* buffer = NULL;
L
Liu Jicong 已提交
358
  SArray*        pArray = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
359

L
Liu Jicong 已提交
360 361 362 363
  SListNode* pn;
  while ((pn = tdListNext(&iter)) != NULL) {
    STqTopic*   pTopic = *(STqTopic**)pn->data;
    int         idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
364 365
    STqMsgItem* pItem = &pTopic->buffer[idx];
    if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
L
Liu Jicong 已提交
366 367
      if (pItem->status == TQ_ITEM_READY) {
        // if has data
L
Liu Jicong 已提交
368 369 370 371 372 373 374 375 376 377 378 379
        totSize += pTopic->buffer[idx].size;
        if (totSize > sizeLimit) {
          void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
          if (ptr == NULL) {
            totSize -= pTopic->buffer[idx].size;
            terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
            // return msgs already copied
            break;
          }
          (*pRsp)->pCont = ptr;
          break;
        }
L
Liu Jicong 已提交
380
        *((int64_t*)buffer) = htobe64(pTopic->topicId);
L
Liu Jicong 已提交
381
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
382
        *((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
L
Liu Jicong 已提交
383 384 385 386 387 388 389
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
        memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
        buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
        numOfMsgs++;
        if (totSize > sizeLimit) {
          break;
        }
L
Liu Jicong 已提交
390 391
      } else if (pItem->status == TQ_ITEM_PROCESS) {
        // if not have data but in process
L
Liu Jicong 已提交
392

L
Liu Jicong 已提交
393 394
      } else if (pItem->status == TQ_ITEM_EMPTY) {
        // if not have data and not in process
L
Liu Jicong 已提交
395
        int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
L
Liu Jicong 已提交
396
        if (old != TQ_ITEM_EMPTY) {
L
Liu Jicong 已提交
397 398 399 400 401 402 403 404 405 406
          continue;
        }
        pItem->offset = pTopic->floatingCursor;
        taosArrayPush(pArray, &pItem);
      } else {
        ASSERT(0);
      }
    }
  }

L
Liu Jicong 已提交
407 408 409 410 411 412 413 414 415 416 417
  if (numOfMsgs > 0) {
    // set code and other msg
    rpcSendResponse(*pRsp);
  } else {
    // most recent data has been fetched

    // enable timer for blocking wait
    // once new data written when waiting, launch query and rsp
  }

  // fetched a num of msgs, rpc response
L
Liu Jicong 已提交
418
  for (int i = 0; i < pArray->size; i++) {
L
Liu Jicong 已提交
419 420
    STqMsgItem* pItem = taosArrayGet(pArray, i);

L
Liu Jicong 已提交
421
    // read from wal
L
Liu Jicong 已提交
422 423 424
    void* raw = NULL;
    /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
    int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
L
Liu Jicong 已提交
425 426
    if (code < 0) {
      // TODO: error
L
Liu Jicong 已提交
427
    }
L
Liu Jicong 已提交
428 429
    // get msgType
    // if submitblk
L
Liu Jicong 已提交
430 431 432
    pItem->executor->assign(pItem->executor->runtimeEnv, raw);
    SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
    pItem->content = content;
L
Liu Jicong 已提交
433
    // if other type, send just put into buffer
L
Liu Jicong 已提交
434
    /*pItem->content = raw;*/
L
Liu Jicong 已提交
435 436 437 438

    int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
    ASSERT(old == TQ_ITEM_PROCESS);
  }
L
Liu Jicong 已提交
439
  taosArrayDestroy(pArray);
L
Liu Jicong 已提交
440 441 442 443 444

  return 0;
}

#if 0
L
Liu Jicong 已提交
445
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
L
Liu Jicong 已提交
446
  if (!tqProtoCheck((STqMsgHead*)pMsg)) {
L
Liu Jicong 已提交
447
    // proto version invalid
L
Liu Jicong 已提交
448 449
    return -1;
  }
L
Liu Jicong 已提交
450 451 452
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
L
Liu Jicong 已提交
453
    // client not connect
L
Liu Jicong 已提交
454 455
    return -1;
  }
L
Liu Jicong 已提交
456
  if (pMsg->acks.ackNum != 0) {
L
Liu Jicong 已提交
457
    if (tqAck(pGroup, &pMsg->acks) != 0) {
L
Liu Jicong 已提交
458
      // ack not success
L
Liu Jicong 已提交
459 460 461 462
      return -1;
    }
  }

L
Liu Jicong 已提交
463
  STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
L
Liu Jicong 已提交
464

L
Liu Jicong 已提交
465
  if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
466
    // fetch error
L
Liu Jicong 已提交
467 468 469
    return -1;
  }

L
Liu Jicong 已提交
470
  // judge and launch new query
L
Liu Jicong 已提交
471 472 473 474
  /*if (tqSendLaunchQuery(gHandle)) {*/
  // launch query error
  /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
475 476
  return 0;
}
L
Liu Jicong 已提交
477
#endif
L
Liu Jicong 已提交
478

L
Liu Jicong 已提交
479
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
L
Liu Jicong 已提交
480
  // calculate size
L
Liu Jicong 已提交
481
  int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
L
Liu Jicong 已提交
482
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
483
    void* tmpPtr = realloc(*ppHead, sz);
L
Liu Jicong 已提交
484
    if (tmpPtr == NULL) {
L
Liu Jicong 已提交
485
      free(*ppHead);
L
Liu Jicong 已提交
486
      // TODO: memory err
L
Liu Jicong 已提交
487 488 489 490
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
491
  }
L
Liu Jicong 已提交
492
  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
493
  // do serialization
L
Liu Jicong 已提交
494
  *(int64_t*)ptr = pGroup->clientId;
L
Liu Jicong 已提交
495
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
496
  *(int64_t*)ptr = pGroup->cgId;
L
Liu Jicong 已提交
497
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
498
  *(int32_t*)ptr = pGroup->topicNum;
499
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
500 501
  if (pGroup->topicNum > 0) {
    tqSerializeListHandle(pGroup->head, ptr);
L
Liu Jicong 已提交
502 503 504 505
  }
  return 0;
}

L
Liu Jicong 已提交
506 507
void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
  STqList* node = listHandle;
508
  ASSERT(node != NULL);
L
Liu Jicong 已提交
509
  while (node) {
L
Liu Jicong 已提交
510
    ptr = tqSerializeTopic(&node->topic, ptr);
L
Liu Jicong 已提交
511 512
    node = node->next;
  }
513
  return ptr;
L
Liu Jicong 已提交
514
}
515

L
Liu Jicong 已提交
516 517
void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
  *(int64_t*)ptr = pTopic->nextConsumeOffset;
L
Liu Jicong 已提交
518
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
519
  *(int64_t*)ptr = pTopic->topicId;
L
Liu Jicong 已提交
520
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
521 522 523 524
  /**(int32_t*)ptr = pTopic->head;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /**(int32_t*)ptr = pTopic->tail;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
525
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
526
    ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
L
Liu Jicong 已提交
527
  }
528
  return ptr;
L
Liu Jicong 已提交
529 530
}

L
Liu Jicong 已提交
531
void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
L
Liu Jicong 已提交
532 533
  // TODO: do we need serialize this?
  // mainly for executor
534
  return ptr;
L
Liu Jicong 已提交
535 536
}

L
Liu Jicong 已提交
537 538 539
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
  STqGroup*   gHandle = *ppGroup;
  const void* ptr = pHead->content;
L
Liu Jicong 已提交
540
  gHandle->clientId = *(int64_t*)ptr;
541 542 543 544 545 546 547
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
548
  STqList* node = gHandle->head;
L
Liu Jicong 已提交
549 550
  for (int i = 0; i < gHandle->topicNum; i++) {
    if (gHandle->head == NULL) {
L
Liu Jicong 已提交
551
      if ((node = malloc(sizeof(STqList))) == NULL) {
L
Liu Jicong 已提交
552
        // TODO: error
553 554
        return NULL;
      }
L
Liu Jicong 已提交
555
      node->next = NULL;
L
Liu Jicong 已提交
556
      ptr = tqDeserializeTopic(ptr, &node->topic);
557 558
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
559
      node->next = malloc(sizeof(STqList));
L
Liu Jicong 已提交
560 561
      if (node->next == NULL) {
        // TODO: error
562 563 564
        return NULL;
      }
      node->next->next = NULL;
L
Liu Jicong 已提交
565
      ptr = tqDeserializeTopic(ptr, &node->next->topic);
566 567 568 569
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
570
}
571

L
Liu Jicong 已提交
572
const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
573
  const void* ptr = pBytes;
L
Liu Jicong 已提交
574
  topic->nextConsumeOffset = *(int64_t*)ptr;
575
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
576
  topic->topicId = *(int64_t*)ptr;
577
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
578 579 580 581
  /*topic->head = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /*topic->tail = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
582
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
583
    ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
584 585
  }
  return ptr;
L
Liu Jicong 已提交
586 587
}

L
Liu Jicong 已提交
588
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
L
Liu Jicong 已提交
589

L
Liu Jicong 已提交
590
// TODO: make this a macro
L
Liu Jicong 已提交
591
int tqGroupSSize(const STqGroup* gHandle) {
L
Liu Jicong 已提交
592 593
  return sizeof(int64_t) * 2  // cId + cgId
         + sizeof(int32_t)    // topicNum
L
Liu Jicong 已提交
594
         + gHandle->topicNum * tqTopicSSize();
L
Liu Jicong 已提交
595
}
596

L
Liu Jicong 已提交
597
// TODO: make this a macro
L
Liu Jicong 已提交
598
int tqTopicSSize() {
L
Liu Jicong 已提交
599 600
  return sizeof(int64_t) * 2    // nextConsumeOffset + topicId
         + sizeof(int32_t) * 2  // head + tail
L
Liu Jicong 已提交
601
         + TQ_BUFFER_SIZE * tqItemSSize();
L
Liu Jicong 已提交
602
}
603

L
Liu Jicong 已提交
604
int tqItemSSize() {
L
Liu Jicong 已提交
605 606
  // TODO: do this need serialization?
  // mainly for executor
L
Liu Jicong 已提交
607 608
  return 0;
}
609

L
Liu Jicong 已提交
610
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
L
Liu Jicong 已提交
611
  SMqCVConsumeReq* pReq = pMsg->pCont;
L
Liu Jicong 已提交
612 613 614 615
  int64_t          reqId = pReq->reqId;
  int64_t          consumerId = pReq->consumerId;
  int64_t          offset = pReq->offset;
  int64_t          blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
616

L
Liu Jicong 已提交
617 618
  STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
  int sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
619

L
Liu Jicong 已提交
620 621
  for (int i = 0 ; i < sz; i++) {
    STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i);
L
Liu Jicong 已提交
622

L
Liu Jicong 已提交
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
    int8_t           pos = offset % TQ_BUFFER_SIZE;
    int8_t           old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
    if (old == 1) {
      // do nothing
    }
    if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
      // TODO
    }
    SWalHead* pHead = pHandle->pReadhandle->pHead;
    while (pHead->head.msgType != TDMT_VND_SUBMIT) {
      // read until find TDMT_VND_SUBMIT
    }
    SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;

    SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;

    void* outputData;
    atomic_store_8(&pHandle->buffer.output[pos].status, 1);

    // put output into rsp
  }
L
Liu Jicong 已提交
644 645 646 647 648 649 650

  // launch query
  // get result
  SMqCvConsumeRsp* pRsp;
  return 0;
}

L
Liu Jicong 已提交
651
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) {
652 653 654 655 656 657 658 659 660 661 662 663
  STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
  if (pReadHandle == NULL) {
    return NULL;
  }
  pReadHandle->pMeta = pMeta;
  pReadHandle->pMsg = pMsg;
  tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
  pReadHandle->ver = -1;
  return NULL;
}

bool tqNextDataBlock(STqReadHandle* pHandle) {
L
Liu Jicong 已提交
664
  if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
665 666 667 668 669 670
    return false;
  }
  return true;
}

int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
L
Liu Jicong 已提交
671 672
  SMemRow         row;
  int32_t         sversion = pHandle->pBlock->sversion;
673 674 675 676
  SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
  pBlockInfo->numOfCols = pSchema->nCols;
  pBlockInfo->rows = pHandle->pBlock->numOfRows;
  pBlockInfo->uid = pHandle->pBlock->uid;
L
Liu Jicong 已提交
677
  // TODO: filter out unused column
678 679
  return 0;
}
L
Liu Jicong 已提交
680 681
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
  int32_t         sversion = pHandle->pBlock->sversion;
682
  SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
L
Liu Jicong 已提交
683 684
  STSchema*       pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
  SArray*         pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
685 686 687 688
  if (pArray == NULL) {
    return NULL;
  }
  SColumnInfoData colInfo;
L
Liu Jicong 已提交
689
  int             sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
690 691 692 693 694 695
  colInfo.pData = malloc(sz);
  if (colInfo.pData == NULL) {
    return NULL;
  }

  for (int i = 0; i < pTschema->numOfCols; i++) {
L
Liu Jicong 已提交
696
    // TODO: filter out unused column
697 698 699 700 701 702 703
    taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
  }

  SMemRow row;
  int32_t kvIdx;
  while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
    for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
L
Liu Jicong 已提交
704 705
      // TODO: filter out unused column
      STColumn* pCol = schemaColAt(pTschema, i);
706
      void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
L
Liu Jicong 已提交
707
      // TODO: handle varlen
708 709 710 711 712 713
      memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
    }
  }
  taosArrayPush(pArray, &colInfo);
  return pArray;
}
L
Liu Jicong 已提交
714 715 716
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
 * status) {*/
/*return 0;*/
717
/*}*/