tq.c 21.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20
// static
// read next version data
L
Liu Jicong 已提交
21
//
L
Liu Jicong 已提交
22
// send to fetch queue
L
Liu Jicong 已提交
23
//
L
Liu Jicong 已提交
24
// handle management message
L
Liu Jicong 已提交
25
//
L
Liu Jicong 已提交
26

L
Liu Jicong 已提交
27 28 29
int tqGroupSSize(const STqGroup* pGroup);
int tqTopicSSize();
int tqItemSSize();
L
Liu Jicong 已提交
30

L
Liu Jicong 已提交
31 32 33
void* tqSerializeListHandle(STqList* listHandle, void* ptr);
void* tqSerializeTopic(STqTopic* pTopic, void* ptr);
void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35 36
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38 39
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
L
Liu Jicong 已提交
40
  if (old == 1) return 0;
L
Liu Jicong 已提交
41 42 43 44 45 46 47

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
L
Liu Jicong 已提交
48
  if (old == 0) return;
L
Liu Jicong 已提交
49 50 51 52
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

L
Liu Jicong 已提交
53
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
54
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
55
  if (pTq == NULL) {
L
Liu Jicong 已提交
56
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
57 58
    return NULL;
  }
H
Hongze Cheng 已提交
59
  pTq->path = strdup(path);
L
Liu Jicong 已提交
60
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
61
#if 0
L
Liu Jicong 已提交
62
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
63 64
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
65
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
66
  }
L
Liu Jicong 已提交
67
#endif
L
Liu Jicong 已提交
68
  pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
L
Liu Jicong 已提交
69
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
70
    free(pTq);
L
Liu Jicong 已提交
71 72 73
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
74 75
    return NULL;
  }
L
Liu Jicong 已提交
76

L
Liu Jicong 已提交
77 78
  return pTq;
}
L
Liu Jicong 已提交
79

L
Liu Jicong 已提交
80 81 82
void tqClose(STQ* pTq) {
  // TODO
}
L
Liu Jicong 已提交
83

L
Liu Jicong 已提交
84 85 86 87
static int tqProtoCheck(STqMsgHead* pMsg) {
  // TODO
  return pMsg->protoVer == 0;
}
L
Liu Jicong 已提交
88

L
Liu Jicong 已提交
89
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
90
  // clean old item and move forward
L
Liu Jicong 已提交
91
  int32_t consumeOffset = pAck->consumeOffset;
L
Liu Jicong 已提交
92
  int     idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
93 94
  ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor);
  tfree(pTopic->buffer[idx].content);
L
Liu Jicong 已提交
95 96 97
  if (1 /* TODO: need to launch new query */) {
    STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
    if (pNewQuery == NULL) {
L
Liu Jicong 已提交
98
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
99 100
      return -1;
    }
L
Liu Jicong 已提交
101 102
    // TODO: lock executor
    // TODO: read from wal and assign to src
L
Liu Jicong 已提交
103 104 105 106 107
    /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
    /*pNewQuery->exec->src = 0;*/
    /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
    /*pNewQuery->next = *ppQuery;*/
    /**ppQuery = pNewQuery;*/
L
Liu Jicong 已提交
108
  }
L
Liu Jicong 已提交
109 110 111
  return 0;
}

L
Liu Jicong 已提交
112
static int tqAck(STqGroup* pGroup, STqAcks* pAcks) {
L
Liu Jicong 已提交
113
  int32_t    ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
114
  STqOneAck* acks = pAcks->acks;
L
Liu Jicong 已提交
115
  // double ptr for acks and list
L
Liu Jicong 已提交
116 117 118 119
  int          i = 0;
  STqList*     node = pGroup->head;
  int          ackCnt = 0;
  STqQueryMsg* pQuery = NULL;
L
Liu Jicong 已提交
120
  while (i < ackNum && node->next) {
L
Liu Jicong 已提交
121
    if (acks[i].topicId == node->next->topic.topicId) {
L
Liu Jicong 已提交
122
      ackCnt++;
L
Liu Jicong 已提交
123 124
      tqAckOneTopic(&node->next->topic, &acks[i], &pQuery);
    } else if (acks[i].topicId < node->next->topic.topicId) {
L
Liu Jicong 已提交
125 126 127
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
128 129
    }
  }
L
Liu Jicong 已提交
130 131
  if (pQuery) {
    // post message
L
Liu Jicong 已提交
132 133 134
  }
  return ackCnt;
}
L
Liu Jicong 已提交
135

L
Liu Jicong 已提交
136
static int tqCommitGroup(STqGroup* pGroup) {
L
Liu Jicong 已提交
137
  // persist modification into disk
L
Liu Jicong 已提交
138 139 140
  return 0;
}

L
Liu Jicong 已提交
141
int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) {
L
Liu Jicong 已提交
142
  // create in disk
L
Liu Jicong 已提交
143 144
  STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup));
  if (pGroup == NULL) {
L
Liu Jicong 已提交
145
    // TODO
L
Liu Jicong 已提交
146 147
    return -1;
  }
L
Liu Jicong 已提交
148 149
  *ppGroup = pGroup;
  memset(pGroup, 0, sizeof(STqGroup));
L
Liu Jicong 已提交
150

L
Liu Jicong 已提交
151
  pGroup->topicList = tdListNew(sizeof(STqTopic));
L
Liu Jicong 已提交
152
  if (pGroup->topicList == NULL) {
L
Liu Jicong 已提交
153 154 155 156 157
    free(pGroup);
    return -1;
  }
  *ppGroup = pGroup;

L
Liu Jicong 已提交
158 159 160
  return 0;
}

L
Liu Jicong 已提交
161 162 163 164 165
STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId);
  if (pGroup == NULL) {
    int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup);
    if (code < 0) {
L
Liu Jicong 已提交
166
      // TODO
L
Liu Jicong 已提交
167 168
      return NULL;
    }
L
Liu Jicong 已提交
169
    tqHandleMovePut(pTq->tqMeta, cId, pGroup);
L
Liu Jicong 已提交
170
  }
L
Liu Jicong 已提交
171
  ASSERT(pGroup);
L
Liu Jicong 已提交
172

L
Liu Jicong 已提交
173
  return pGroup;
L
Liu Jicong 已提交
174
}
L
Liu Jicong 已提交
175

L
Liu Jicong 已提交
176 177 178 179
int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  // TODO
  return 0;
}
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181
int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
182
  // delete from disk
L
Liu Jicong 已提交
183 184 185
  return 0;
}

L
Liu Jicong 已提交
186 187 188
static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
  STqList* pHead = pGroup->head;
  STqList* pNode = pHead;
L
Liu Jicong 已提交
189
  int      totSize = 0;
L
Liu Jicong 已提交
190
  int      numOfMsgs = 0;
L
Liu Jicong 已提交
191
  // TODO: make it a macro
L
Liu Jicong 已提交
192
  int sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
193 194 195 196

  void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
197 198
    return -1;
  }
L
Liu Jicong 已提交
199 200 201
  *pRsp = ptr;
  STqMsgContent* buffer = (*pRsp)->msgs;

L
Liu Jicong 已提交
202 203
  // iterate the list to get msgs of all topics
  // until all topic iterated or msgs over sizeLimit
L
Liu Jicong 已提交
204 205 206 207 208 209
  while (pNode->next) {
    pNode = pNode->next;
    STqTopic* pTopic = &pNode->topic;
    int       idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE;
    if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) {
      totSize += pTopic->buffer[idx].size;
L
Liu Jicong 已提交
210
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
211
        void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize);
L
Liu Jicong 已提交
212
        if (ptr == NULL) {
L
Liu Jicong 已提交
213 214
          totSize -= pTopic->buffer[idx].size;
          terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
215
          // return msgs already copied
L
Liu Jicong 已提交
216 217
          break;
        }
L
Liu Jicong 已提交
218 219
        *pRsp = ptr;
        break;
L
Liu Jicong 已提交
220
      }
L
Liu Jicong 已提交
221
      *((int64_t*)buffer) = pTopic->topicId;
L
Liu Jicong 已提交
222
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
223
      *((int64_t*)buffer) = pTopic->buffer[idx].size;
L
Liu Jicong 已提交
224
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
225 226 227
      memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
      numOfMsgs++;
L
Liu Jicong 已提交
228
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
229 230 231 232
        break;
      }
    }
  }
L
Liu Jicong 已提交
233 234
  (*pRsp)->bodySize = totSize;
  return numOfMsgs;
L
Liu Jicong 已提交
235 236
}

L
Liu Jicong 已提交
237
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
L
Liu Jicong 已提交
238

L
Liu Jicong 已提交
239 240 241 242 243 244 245 246 247 248 249
int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) {
  if (tqQueryExecuting(bufItem->status)) {
    return 0;
  }
  bufItem->status = 1;
  // load data from wal or buffer pool
  // put into exec
  // send exec into non blocking queue
  // when query finished, put into buffer pool
  return 0;
}
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
252
/*return 0;*/
L
Liu Jicong 已提交
253 254
/*}*/

L
Liu Jicong 已提交
255 256 257
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
258 259 260
  return 0;
}

L
Liu Jicong 已提交
261
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
262
  // do nothing
L
Liu Jicong 已提交
263 264 265
  return 0;
}

L
Liu Jicong 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) {
  int code;
  memset(pTopic->buffer, 0, sizeof(pTopic->buffer));
  // launch query
  for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) {
    int pos = i % TQ_BUFFER_SIZE;
    code = tqSendLaunchQuery(&pTopic->buffer[pos], offset);
    if (code < 0) {
      // TODO: error handling
    }
  }
  // set offset
  pTopic->nextConsumeOffset = offset;
  pTopic->floatingCursor = offset;
  return 0;
}

STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) {
  // TODO
  return NULL;
}

int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
  int       code;
  int64_t   clientId = pMsg->head.clientId;
  int64_t   topicId = pMsg->topicId;
  int64_t   offset = pMsg->offset;
  STqGroup* gHandle = tqGetGroup(pTq, clientId);
  if (gHandle == NULL) {
    // client not connect
    return -1;
  }
  STqTopic* topicHandle = tqFindTopic(gHandle, topicId);
  if (topicHandle == NULL) {
    return -1;
  }
  if (pMsg->offset == topicHandle->nextConsumeOffset) {
    return 0;
  }
  // TODO: check log last version

  code = tqBufferSetOffset(topicHandle, offset);
  if (code < 0) {
    // set error code
    return -1;
  }

L
Liu Jicong 已提交
313 314 315
  return 0;
}

L
Liu Jicong 已提交
316 317
// temporary
int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
L
Liu Jicong 已提交
318 319 320 321 322 323
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }
L
Liu Jicong 已提交
324 325 326 327 328 329 330
  pGroup->rspHandle.handle = pRsp->handle;
  pGroup->rspHandle.ahandle = pRsp->ahandle;

  return 0;
}

int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
L
Liu Jicong 已提交
331 332 333
  STqConsumeReq* pMsg = pReq->pCont;
  int64_t        clientId = pMsg->head.clientId;
  STqGroup*      pGroup = tqGetGroup(pTq, clientId);
L
Liu Jicong 已提交
334 335 336 337 338 339 340 341 342 343 344
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }

  SList* topicList = pGroup->topicList;

  int totSize = 0;
  int numOfMsgs = 0;
  int sizeLimit = 4096;

L
Liu Jicong 已提交
345 346
  STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
  void*          ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
L
Liu Jicong 已提交
347 348 349 350 351 352 353 354 355 356
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  (*pRsp)->pCont = ptr;

  SListIter iter;
  tdListInitIter(topicList, &iter, TD_LIST_FORWARD);

  STqMsgContent* buffer = NULL;
L
Liu Jicong 已提交
357
  SArray*        pArray = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
358

L
Liu Jicong 已提交
359 360 361 362
  SListNode* pn;
  while ((pn = tdListNext(&iter)) != NULL) {
    STqTopic*   pTopic = *(STqTopic**)pn->data;
    int         idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
363 364
    STqMsgItem* pItem = &pTopic->buffer[idx];
    if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
L
Liu Jicong 已提交
365 366
      if (pItem->status == TQ_ITEM_READY) {
        // if has data
L
Liu Jicong 已提交
367 368 369 370 371 372 373 374 375 376 377 378
        totSize += pTopic->buffer[idx].size;
        if (totSize > sizeLimit) {
          void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
          if (ptr == NULL) {
            totSize -= pTopic->buffer[idx].size;
            terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
            // return msgs already copied
            break;
          }
          (*pRsp)->pCont = ptr;
          break;
        }
L
Liu Jicong 已提交
379
        *((int64_t*)buffer) = htobe64(pTopic->topicId);
L
Liu Jicong 已提交
380
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
381
        *((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
L
Liu Jicong 已提交
382 383 384 385 386 387 388
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
        memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
        buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
        numOfMsgs++;
        if (totSize > sizeLimit) {
          break;
        }
L
Liu Jicong 已提交
389 390
      } else if (pItem->status == TQ_ITEM_PROCESS) {
        // if not have data but in process
L
Liu Jicong 已提交
391

L
Liu Jicong 已提交
392 393
      } else if (pItem->status == TQ_ITEM_EMPTY) {
        // if not have data and not in process
L
Liu Jicong 已提交
394
        int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
L
Liu Jicong 已提交
395
        if (old != TQ_ITEM_EMPTY) {
L
Liu Jicong 已提交
396 397 398 399 400 401 402 403 404 405
          continue;
        }
        pItem->offset = pTopic->floatingCursor;
        taosArrayPush(pArray, &pItem);
      } else {
        ASSERT(0);
      }
    }
  }

L
Liu Jicong 已提交
406 407 408 409 410 411 412 413 414 415 416
  if (numOfMsgs > 0) {
    // set code and other msg
    rpcSendResponse(*pRsp);
  } else {
    // most recent data has been fetched

    // enable timer for blocking wait
    // once new data written when waiting, launch query and rsp
  }

  // fetched a num of msgs, rpc response
L
Liu Jicong 已提交
417
  for (int i = 0; i < pArray->size; i++) {
L
Liu Jicong 已提交
418 419
    STqMsgItem* pItem = taosArrayGet(pArray, i);

L
Liu Jicong 已提交
420
    // read from wal
L
Liu Jicong 已提交
421 422
    void* raw = NULL;
    /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
L
Liu Jicong 已提交
423 424
    /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
    /*if (code < 0) {*/
L
Liu Jicong 已提交
425
      // TODO: error
L
Liu Jicong 已提交
426
    /*}*/
L
Liu Jicong 已提交
427 428
    // get msgType
    // if submitblk
L
Liu Jicong 已提交
429 430 431
    pItem->executor->assign(pItem->executor->runtimeEnv, raw);
    SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
    pItem->content = content;
L
Liu Jicong 已提交
432
    // if other type, send just put into buffer
L
Liu Jicong 已提交
433
    /*pItem->content = raw;*/
L
Liu Jicong 已提交
434 435 436 437

    int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
    ASSERT(old == TQ_ITEM_PROCESS);
  }
L
Liu Jicong 已提交
438
  taosArrayDestroy(pArray);
L
Liu Jicong 已提交
439 440 441 442 443

  return 0;
}

#if 0
L
Liu Jicong 已提交
444
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
L
Liu Jicong 已提交
445
  if (!tqProtoCheck((STqMsgHead*)pMsg)) {
L
Liu Jicong 已提交
446
    // proto version invalid
L
Liu Jicong 已提交
447 448
    return -1;
  }
L
Liu Jicong 已提交
449 450 451
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
L
Liu Jicong 已提交
452
    // client not connect
L
Liu Jicong 已提交
453 454
    return -1;
  }
L
Liu Jicong 已提交
455
  if (pMsg->acks.ackNum != 0) {
L
Liu Jicong 已提交
456
    if (tqAck(pGroup, &pMsg->acks) != 0) {
L
Liu Jicong 已提交
457
      // ack not success
L
Liu Jicong 已提交
458 459 460 461
      return -1;
    }
  }

L
Liu Jicong 已提交
462
  STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
L
Liu Jicong 已提交
463

L
Liu Jicong 已提交
464
  if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
465
    // fetch error
L
Liu Jicong 已提交
466 467 468
    return -1;
  }

L
Liu Jicong 已提交
469
  // judge and launch new query
L
Liu Jicong 已提交
470 471 472 473
  /*if (tqSendLaunchQuery(gHandle)) {*/
  // launch query error
  /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
474 475
  return 0;
}
L
Liu Jicong 已提交
476
#endif
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
L
Liu Jicong 已提交
479
  // calculate size
L
Liu Jicong 已提交
480
  int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
L
Liu Jicong 已提交
481
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
482
    void* tmpPtr = realloc(*ppHead, sz);
L
Liu Jicong 已提交
483
    if (tmpPtr == NULL) {
L
Liu Jicong 已提交
484
      free(*ppHead);
L
Liu Jicong 已提交
485
      // TODO: memory err
L
Liu Jicong 已提交
486 487 488 489
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
490
  }
L
Liu Jicong 已提交
491
  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
492
  // do serialization
L
Liu Jicong 已提交
493
  *(int64_t*)ptr = pGroup->clientId;
L
Liu Jicong 已提交
494
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
495
  *(int64_t*)ptr = pGroup->cgId;
L
Liu Jicong 已提交
496
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
497
  *(int32_t*)ptr = pGroup->topicNum;
498
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
499 500
  if (pGroup->topicNum > 0) {
    tqSerializeListHandle(pGroup->head, ptr);
L
Liu Jicong 已提交
501 502 503 504
  }
  return 0;
}

L
Liu Jicong 已提交
505 506
void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
  STqList* node = listHandle;
507
  ASSERT(node != NULL);
L
Liu Jicong 已提交
508
  while (node) {
L
Liu Jicong 已提交
509
    ptr = tqSerializeTopic(&node->topic, ptr);
L
Liu Jicong 已提交
510 511
    node = node->next;
  }
512
  return ptr;
L
Liu Jicong 已提交
513
}
514

L
Liu Jicong 已提交
515 516
void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
  *(int64_t*)ptr = pTopic->nextConsumeOffset;
L
Liu Jicong 已提交
517
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
518
  *(int64_t*)ptr = pTopic->topicId;
L
Liu Jicong 已提交
519
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
520 521 522 523
  /**(int32_t*)ptr = pTopic->head;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /**(int32_t*)ptr = pTopic->tail;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
524
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
525
    ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
L
Liu Jicong 已提交
526
  }
527
  return ptr;
L
Liu Jicong 已提交
528 529
}

L
Liu Jicong 已提交
530
void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
L
Liu Jicong 已提交
531 532
  // TODO: do we need serialize this?
  // mainly for executor
533
  return ptr;
L
Liu Jicong 已提交
534 535
}

L
Liu Jicong 已提交
536 537 538
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
  STqGroup*   gHandle = *ppGroup;
  const void* ptr = pHead->content;
L
Liu Jicong 已提交
539
  gHandle->clientId = *(int64_t*)ptr;
540 541 542 543 544 545 546
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
547
  STqList* node = gHandle->head;
L
Liu Jicong 已提交
548 549
  for (int i = 0; i < gHandle->topicNum; i++) {
    if (gHandle->head == NULL) {
L
Liu Jicong 已提交
550
      if ((node = malloc(sizeof(STqList))) == NULL) {
L
Liu Jicong 已提交
551
        // TODO: error
552 553
        return NULL;
      }
L
Liu Jicong 已提交
554
      node->next = NULL;
L
Liu Jicong 已提交
555
      ptr = tqDeserializeTopic(ptr, &node->topic);
556 557
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
558
      node->next = malloc(sizeof(STqList));
L
Liu Jicong 已提交
559 560
      if (node->next == NULL) {
        // TODO: error
561 562 563
        return NULL;
      }
      node->next->next = NULL;
L
Liu Jicong 已提交
564
      ptr = tqDeserializeTopic(ptr, &node->next->topic);
565 566 567 568
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
569
}
570

L
Liu Jicong 已提交
571
const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
572
  const void* ptr = pBytes;
L
Liu Jicong 已提交
573
  topic->nextConsumeOffset = *(int64_t*)ptr;
574
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
575
  topic->topicId = *(int64_t*)ptr;
576
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
577 578 579 580
  /*topic->head = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /*topic->tail = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
581
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
582
    ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
583 584
  }
  return ptr;
L
Liu Jicong 已提交
585 586
}

L
Liu Jicong 已提交
587
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
L
Liu Jicong 已提交
588

L
Liu Jicong 已提交
589
// TODO: make this a macro
L
Liu Jicong 已提交
590
int tqGroupSSize(const STqGroup* gHandle) {
L
Liu Jicong 已提交
591 592
  return sizeof(int64_t) * 2  // cId + cgId
         + sizeof(int32_t)    // topicNum
L
Liu Jicong 已提交
593
         + gHandle->topicNum * tqTopicSSize();
L
Liu Jicong 已提交
594
}
595

L
Liu Jicong 已提交
596
// TODO: make this a macro
L
Liu Jicong 已提交
597
int tqTopicSSize() {
L
Liu Jicong 已提交
598 599
  return sizeof(int64_t) * 2    // nextConsumeOffset + topicId
         + sizeof(int32_t) * 2  // head + tail
L
Liu Jicong 已提交
600
         + TQ_BUFFER_SIZE * tqItemSSize();
L
Liu Jicong 已提交
601
}
602

L
Liu Jicong 已提交
603
int tqItemSSize() {
L
Liu Jicong 已提交
604 605
  // TODO: do this need serialization?
  // mainly for executor
L
Liu Jicong 已提交
606 607
  return 0;
}
608

L
Liu Jicong 已提交
609
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
L
Liu Jicong 已提交
610
  SMqCVConsumeReq* pReq = pMsg->pCont;
L
Liu Jicong 已提交
611 612 613 614
  int64_t          reqId = pReq->reqId;
  int64_t          consumerId = pReq->consumerId;
  int64_t          offset = pReq->offset;
  int64_t          blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
615

L
Liu Jicong 已提交
616 617
  STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
  int sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
618

L
Liu Jicong 已提交
619 620
  for (int i = 0 ; i < sz; i++) {
    STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i);
L
Liu Jicong 已提交
621

L
Liu Jicong 已提交
622 623 624 625
    int8_t           pos = offset % TQ_BUFFER_SIZE;
    int8_t           old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
    if (old == 1) {
      // do nothing
L
Liu Jicong 已提交
626
      continue;
L
Liu Jicong 已提交
627 628 629 630 631 632 633 634 635
    }
    if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
      // TODO
    }
    SWalHead* pHead = pHandle->pReadhandle->pHead;
    while (pHead->head.msgType != TDMT_VND_SUBMIT) {
      // read until find TDMT_VND_SUBMIT
    }
    SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
636
    void* task = pHandle->buffer.output[pos].task;
L
Liu Jicong 已提交
637

638 639 640 641
    qStreamExecTaskSetInput(task, pCont);
    SSDataBlock* pDataBlock;
    uint64_t ts;
    if (qExecTask(task, &pDataBlock, &ts) < 0) {
L
Liu Jicong 已提交
642

643
    }
L
Liu Jicong 已提交
644
    // TODO: launch query and get output data
645
    pHandle->buffer.output[pos].dst = pDataBlock;
L
Liu Jicong 已提交
646 647 648 649 650 651 652 653
    if (pHandle->buffer.firstOffset == -1
        || pReq->offset < pHandle->buffer.firstOffset) {
      pHandle->buffer.firstOffset = pReq->offset;
    }
    if (pHandle->buffer.lastOffset == -1
        || pReq->offset > pHandle->buffer.lastOffset) {
      pHandle->buffer.lastOffset = pReq->offset;
    }
L
Liu Jicong 已提交
654 655 656 657
    atomic_store_8(&pHandle->buffer.output[pos].status, 1);

    // put output into rsp
  }
L
Liu Jicong 已提交
658 659 660 661 662 663 664

  // launch query
  // get result
  SMqCvConsumeRsp* pRsp;
  return 0;
}

L
Liu Jicong 已提交
665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
  STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
  if (pConsumer == NULL) {
    return -1;
  }
  
  STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
  if (pTopic == NULL) {
    free(pConsumer);
    return -1;
  }
  strcpy(pTopic->topicName, pReq->topicName); 
  strcpy(pTopic->cgroup, pReq->cgroup); 
  strcpy(pTopic->sql, pReq->sql);
  strcpy(pTopic->logicalPlan, pReq->logicalPlan);
  strcpy(pTopic->physicalPlan, pReq->physicalPlan);
681

L
Liu Jicong 已提交
682 683 684 685
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
686
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL);
L
Liu Jicong 已提交
687 688 689 690 691 692 693
  }
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  // write mq meta
  return 0;
}

STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) {
694 695 696 697 698
  STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
  if (pReadHandle == NULL) {
    return NULL;
  }
  pReadHandle->pMeta = pMeta;
L
Liu Jicong 已提交
699
  pReadHandle->pMsg = NULL;
700
  pReadHandle->ver = -1;
L
Liu Jicong 已提交
701
  pReadHandle->pColumnIdList = pColumnIdList;
702 703 704
  return NULL;
}

L
Liu Jicong 已提交
705 706 707 708 709 710 711
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
  pReadHandle->pMsg = pMsg;
  tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
  pReadHandle->ver = ver;
  memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
}

712
bool tqNextDataBlock(STqReadHandle* pHandle) {
L
Liu Jicong 已提交
713
  if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
714 715 716 717 718 719
    return false;
  }
  return true;
}

int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
L
Liu Jicong 已提交
720 721
  SMemRow         row;
  int32_t         sversion = pHandle->pBlock->sversion;
722 723 724 725
  SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
  pBlockInfo->numOfCols = pSchema->nCols;
  pBlockInfo->rows = pHandle->pBlock->numOfRows;
  pBlockInfo->uid = pHandle->pBlock->uid;
L
Liu Jicong 已提交
726
  // TODO: filter out unused column
727 728
  return 0;
}
L
Liu Jicong 已提交
729

L
Liu Jicong 已提交
730
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
L
Liu Jicong 已提交
731
  int32_t         sversion = pHandle->pBlock->sversion;
732
  SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
L
Liu Jicong 已提交
733 734
  STSchema*       pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
  SArray*         pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
735 736 737 738
  if (pArray == NULL) {
    return NULL;
  }
  SColumnInfoData colInfo;
L
Liu Jicong 已提交
739
  int             sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
740 741 742 743 744 745 746 747 748
  colInfo.pData = malloc(sz);
  if (colInfo.pData == NULL) {
    return NULL;
  }

  SMemRow row;
  int32_t kvIdx;
  while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
    for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
L
Liu Jicong 已提交
749 750
      // TODO: filter out unused column
      STColumn* pCol = schemaColAt(pTschema, i);
751
      void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
L
Liu Jicong 已提交
752
      // TODO: handle varlen
753 754 755 756 757 758
      memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
    }
  }
  taosArrayPush(pArray, &colInfo);
  return pArray;
}