tq.c 22.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
L
Liu Jicong 已提交
18
#include "tcompare.h"
S
Shengliang Guan 已提交
19

L
Liu Jicong 已提交
20 21
// static
// read next version data
L
Liu Jicong 已提交
22
//
L
Liu Jicong 已提交
23
// send to fetch queue
L
Liu Jicong 已提交
24
//
L
Liu Jicong 已提交
25
// handle management message
L
Liu Jicong 已提交
26
//
L
Liu Jicong 已提交
27

L
Liu Jicong 已提交
28 29 30
int tqGroupSSize(const STqGroup* pGroup);
int tqTopicSSize();
int tqItemSSize();
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
void* tqSerializeListHandle(STqList* listHandle, void* ptr);
void* tqSerializeTopic(STqTopic* pTopic, void* ptr);
void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
L
Liu Jicong 已提交
35

L
Liu Jicong 已提交
36 37
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
L
Liu Jicong 已提交
38

L
Liu Jicong 已提交
39 40
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
L
Liu Jicong 已提交
41
  if (old == 1) return 0;
L
Liu Jicong 已提交
42 43 44 45 46 47 48

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
L
Liu Jicong 已提交
49
  if (old == 0) return;
L
Liu Jicong 已提交
50 51 52 53
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

L
Liu Jicong 已提交
54
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
55
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
56
  if (pTq == NULL) {
L
Liu Jicong 已提交
57
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
58 59
    return NULL;
  }
H
Hongze Cheng 已提交
60
  pTq->path = strdup(path);
L
Liu Jicong 已提交
61
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
62 63
  pTq->pWal = pWal;
  pTq->pMeta = pMeta;
L
Liu Jicong 已提交
64
#if 0
L
Liu Jicong 已提交
65
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
66 67
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
68
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
69
  }
L
Liu Jicong 已提交
70
#endif
L
Liu Jicong 已提交
71
  pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
L
Liu Jicong 已提交
72
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
73
    free(pTq);
L
Liu Jicong 已提交
74 75 76
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
77 78
    return NULL;
  }
L
Liu Jicong 已提交
79

L
Liu Jicong 已提交
80 81
  return pTq;
}
L
Liu Jicong 已提交
82

L
Liu Jicong 已提交
83 84 85
void tqClose(STQ* pTq) {
  // TODO
}
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87 88 89 90
static int tqProtoCheck(STqMsgHead* pMsg) {
  // TODO
  return pMsg->protoVer == 0;
}
L
Liu Jicong 已提交
91

L
Liu Jicong 已提交
92
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
93
  // clean old item and move forward
L
Liu Jicong 已提交
94
  int32_t consumeOffset = pAck->consumeOffset;
L
Liu Jicong 已提交
95
  int     idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
96 97
  ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor);
  tfree(pTopic->buffer[idx].content);
L
Liu Jicong 已提交
98 99 100
  if (1 /* TODO: need to launch new query */) {
    STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
    if (pNewQuery == NULL) {
L
Liu Jicong 已提交
101
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
102 103
      return -1;
    }
L
Liu Jicong 已提交
104 105
    // TODO: lock executor
    // TODO: read from wal and assign to src
L
Liu Jicong 已提交
106 107 108 109 110
    /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
    /*pNewQuery->exec->src = 0;*/
    /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
    /*pNewQuery->next = *ppQuery;*/
    /**ppQuery = pNewQuery;*/
L
Liu Jicong 已提交
111
  }
L
Liu Jicong 已提交
112 113 114
  return 0;
}

L
Liu Jicong 已提交
115
static int tqAck(STqGroup* pGroup, STqAcks* pAcks) {
L
Liu Jicong 已提交
116
  int32_t    ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
117
  STqOneAck* acks = pAcks->acks;
L
Liu Jicong 已提交
118
  // double ptr for acks and list
L
Liu Jicong 已提交
119 120 121 122
  int          i = 0;
  STqList*     node = pGroup->head;
  int          ackCnt = 0;
  STqQueryMsg* pQuery = NULL;
L
Liu Jicong 已提交
123
  while (i < ackNum && node->next) {
L
Liu Jicong 已提交
124
    if (acks[i].topicId == node->next->topic.topicId) {
L
Liu Jicong 已提交
125
      ackCnt++;
L
Liu Jicong 已提交
126 127
      tqAckOneTopic(&node->next->topic, &acks[i], &pQuery);
    } else if (acks[i].topicId < node->next->topic.topicId) {
L
Liu Jicong 已提交
128 129 130
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
131 132
    }
  }
L
Liu Jicong 已提交
133 134
  if (pQuery) {
    // post message
L
Liu Jicong 已提交
135 136 137
  }
  return ackCnt;
}
L
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
static int tqCommitGroup(STqGroup* pGroup) {
L
Liu Jicong 已提交
140
  // persist modification into disk
L
Liu Jicong 已提交
141 142 143
  return 0;
}

L
Liu Jicong 已提交
144
int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) {
L
Liu Jicong 已提交
145
  // create in disk
L
Liu Jicong 已提交
146 147
  STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup));
  if (pGroup == NULL) {
L
Liu Jicong 已提交
148
    // TODO
L
Liu Jicong 已提交
149 150
    return -1;
  }
L
Liu Jicong 已提交
151 152
  *ppGroup = pGroup;
  memset(pGroup, 0, sizeof(STqGroup));
L
Liu Jicong 已提交
153

L
Liu Jicong 已提交
154
  pGroup->topicList = tdListNew(sizeof(STqTopic));
L
Liu Jicong 已提交
155
  if (pGroup->topicList == NULL) {
L
Liu Jicong 已提交
156 157 158 159 160
    free(pGroup);
    return -1;
  }
  *ppGroup = pGroup;

L
Liu Jicong 已提交
161 162 163
  return 0;
}

L
Liu Jicong 已提交
164 165 166 167 168
STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId);
  if (pGroup == NULL) {
    int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup);
    if (code < 0) {
L
Liu Jicong 已提交
169
      // TODO
L
Liu Jicong 已提交
170 171
      return NULL;
    }
L
Liu Jicong 已提交
172
    tqHandleMovePut(pTq->tqMeta, cId, pGroup);
L
Liu Jicong 已提交
173
  }
L
Liu Jicong 已提交
174
  ASSERT(pGroup);
L
Liu Jicong 已提交
175

L
Liu Jicong 已提交
176
  return pGroup;
L
Liu Jicong 已提交
177
}
L
Liu Jicong 已提交
178

L
Liu Jicong 已提交
179 180 181 182
int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  // TODO
  return 0;
}
L
Liu Jicong 已提交
183

L
Liu Jicong 已提交
184
int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
185
  // delete from disk
L
Liu Jicong 已提交
186 187 188
  return 0;
}

L
Liu Jicong 已提交
189 190 191
static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
  STqList* pHead = pGroup->head;
  STqList* pNode = pHead;
L
Liu Jicong 已提交
192
  int      totSize = 0;
L
Liu Jicong 已提交
193
  int      numOfMsgs = 0;
L
Liu Jicong 已提交
194
  // TODO: make it a macro
L
Liu Jicong 已提交
195
  int sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
196 197 198 199

  void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
200 201
    return -1;
  }
L
Liu Jicong 已提交
202 203 204
  *pRsp = ptr;
  STqMsgContent* buffer = (*pRsp)->msgs;

L
Liu Jicong 已提交
205 206
  // iterate the list to get msgs of all topics
  // until all topic iterated or msgs over sizeLimit
L
Liu Jicong 已提交
207 208 209 210 211 212
  while (pNode->next) {
    pNode = pNode->next;
    STqTopic* pTopic = &pNode->topic;
    int       idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE;
    if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) {
      totSize += pTopic->buffer[idx].size;
L
Liu Jicong 已提交
213
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
214
        void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize);
L
Liu Jicong 已提交
215
        if (ptr == NULL) {
L
Liu Jicong 已提交
216 217
          totSize -= pTopic->buffer[idx].size;
          terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
218
          // return msgs already copied
L
Liu Jicong 已提交
219 220
          break;
        }
L
Liu Jicong 已提交
221 222
        *pRsp = ptr;
        break;
L
Liu Jicong 已提交
223
      }
L
Liu Jicong 已提交
224
      *((int64_t*)buffer) = pTopic->topicId;
L
Liu Jicong 已提交
225
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
226
      *((int64_t*)buffer) = pTopic->buffer[idx].size;
L
Liu Jicong 已提交
227
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
228 229 230
      memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
      numOfMsgs++;
L
Liu Jicong 已提交
231
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
232 233 234 235
        break;
      }
    }
  }
L
Liu Jicong 已提交
236 237
  (*pRsp)->bodySize = totSize;
  return numOfMsgs;
L
Liu Jicong 已提交
238 239
}

L
Liu Jicong 已提交
240
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
L
Liu Jicong 已提交
241

L
Liu Jicong 已提交
242 243 244 245 246 247 248 249 250 251 252
int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) {
  if (tqQueryExecuting(bufItem->status)) {
    return 0;
  }
  bufItem->status = 1;
  // load data from wal or buffer pool
  // put into exec
  // send exec into non blocking queue
  // when query finished, put into buffer pool
  return 0;
}
L
Liu Jicong 已提交
253

L
Liu Jicong 已提交
254
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
255
/*return 0;*/
L
Liu Jicong 已提交
256 257
/*}*/

L
Liu Jicong 已提交
258 259 260
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
261 262 263
  return 0;
}

L
Liu Jicong 已提交
264
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
265
  // do nothing
L
Liu Jicong 已提交
266 267 268
  return 0;
}

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) {
  int code;
  memset(pTopic->buffer, 0, sizeof(pTopic->buffer));
  // launch query
  for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) {
    int pos = i % TQ_BUFFER_SIZE;
    code = tqSendLaunchQuery(&pTopic->buffer[pos], offset);
    if (code < 0) {
      // TODO: error handling
    }
  }
  // set offset
  pTopic->nextConsumeOffset = offset;
  pTopic->floatingCursor = offset;
  return 0;
}

STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) {
  // TODO
  return NULL;
}

int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
  int       code;
  int64_t   clientId = pMsg->head.clientId;
  int64_t   topicId = pMsg->topicId;
  int64_t   offset = pMsg->offset;
  STqGroup* gHandle = tqGetGroup(pTq, clientId);
  if (gHandle == NULL) {
    // client not connect
    return -1;
  }
  STqTopic* topicHandle = tqFindTopic(gHandle, topicId);
  if (topicHandle == NULL) {
    return -1;
  }
  if (pMsg->offset == topicHandle->nextConsumeOffset) {
    return 0;
  }
  // TODO: check log last version

  code = tqBufferSetOffset(topicHandle, offset);
  if (code < 0) {
    // set error code
    return -1;
  }

L
Liu Jicong 已提交
316 317 318
  return 0;
}

L
Liu Jicong 已提交
319 320
// temporary
int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
L
Liu Jicong 已提交
321 322 323 324 325 326
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }
L
Liu Jicong 已提交
327 328 329 330 331 332 333
  pGroup->rspHandle.handle = pRsp->handle;
  pGroup->rspHandle.ahandle = pRsp->ahandle;

  return 0;
}

int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
L
Liu Jicong 已提交
334 335 336
  STqConsumeReq* pMsg = pReq->pCont;
  int64_t        clientId = pMsg->head.clientId;
  STqGroup*      pGroup = tqGetGroup(pTq, clientId);
L
Liu Jicong 已提交
337 338 339 340 341 342 343 344 345 346 347
  if (pGroup == NULL) {
    terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
    return -1;
  }

  SList* topicList = pGroup->topicList;

  int totSize = 0;
  int numOfMsgs = 0;
  int sizeLimit = 4096;

L
Liu Jicong 已提交
348 349
  STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
  void*          ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
L
Liu Jicong 已提交
350 351 352 353 354 355 356 357 358 359
  if (ptr == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  (*pRsp)->pCont = ptr;

  SListIter iter;
  tdListInitIter(topicList, &iter, TD_LIST_FORWARD);

  STqMsgContent* buffer = NULL;
L
Liu Jicong 已提交
360
  SArray*        pArray = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
361

L
Liu Jicong 已提交
362 363 364 365
  SListNode* pn;
  while ((pn = tdListNext(&iter)) != NULL) {
    STqTopic*   pTopic = *(STqTopic**)pn->data;
    int         idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
366 367
    STqMsgItem* pItem = &pTopic->buffer[idx];
    if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
L
Liu Jicong 已提交
368 369
      if (pItem->status == TQ_ITEM_READY) {
        // if has data
L
Liu Jicong 已提交
370 371 372 373 374 375 376 377 378 379 380 381
        totSize += pTopic->buffer[idx].size;
        if (totSize > sizeLimit) {
          void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
          if (ptr == NULL) {
            totSize -= pTopic->buffer[idx].size;
            terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
            // return msgs already copied
            break;
          }
          (*pRsp)->pCont = ptr;
          break;
        }
L
Liu Jicong 已提交
382
        *((int64_t*)buffer) = htobe64(pTopic->topicId);
L
Liu Jicong 已提交
383
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
L
Liu Jicong 已提交
384
        *((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
L
Liu Jicong 已提交
385 386 387 388 389 390 391
        buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
        memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
        buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
        numOfMsgs++;
        if (totSize > sizeLimit) {
          break;
        }
L
Liu Jicong 已提交
392 393
      } else if (pItem->status == TQ_ITEM_PROCESS) {
        // if not have data but in process
L
Liu Jicong 已提交
394

L
Liu Jicong 已提交
395 396
      } else if (pItem->status == TQ_ITEM_EMPTY) {
        // if not have data and not in process
L
Liu Jicong 已提交
397
        int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
L
Liu Jicong 已提交
398
        if (old != TQ_ITEM_EMPTY) {
L
Liu Jicong 已提交
399 400 401 402 403 404 405 406 407 408
          continue;
        }
        pItem->offset = pTopic->floatingCursor;
        taosArrayPush(pArray, &pItem);
      } else {
        ASSERT(0);
      }
    }
  }

L
Liu Jicong 已提交
409 410 411 412 413 414 415 416 417 418 419
  if (numOfMsgs > 0) {
    // set code and other msg
    rpcSendResponse(*pRsp);
  } else {
    // most recent data has been fetched

    // enable timer for blocking wait
    // once new data written when waiting, launch query and rsp
  }

  // fetched a num of msgs, rpc response
L
Liu Jicong 已提交
420
  for (int i = 0; i < pArray->size; i++) {
L
Liu Jicong 已提交
421 422
    STqMsgItem* pItem = taosArrayGet(pArray, i);

L
Liu Jicong 已提交
423
    // read from wal
L
Liu Jicong 已提交
424 425
    void* raw = NULL;
    /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
L
Liu Jicong 已提交
426 427
    /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
    /*if (code < 0) {*/
L
Liu Jicong 已提交
428
    // TODO: error
L
Liu Jicong 已提交
429
    /*}*/
L
Liu Jicong 已提交
430 431
    // get msgType
    // if submitblk
L
Liu Jicong 已提交
432 433 434
    pItem->executor->assign(pItem->executor->runtimeEnv, raw);
    SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
    pItem->content = content;
L
Liu Jicong 已提交
435
    // if other type, send just put into buffer
L
Liu Jicong 已提交
436
    /*pItem->content = raw;*/
L
Liu Jicong 已提交
437 438 439 440

    int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
    ASSERT(old == TQ_ITEM_PROCESS);
  }
L
Liu Jicong 已提交
441
  taosArrayDestroy(pArray);
L
Liu Jicong 已提交
442 443 444 445 446

  return 0;
}

#if 0
L
Liu Jicong 已提交
447
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
L
Liu Jicong 已提交
448
  if (!tqProtoCheck((STqMsgHead*)pMsg)) {
L
Liu Jicong 已提交
449
    // proto version invalid
L
Liu Jicong 已提交
450 451
    return -1;
  }
L
Liu Jicong 已提交
452 453 454
  int64_t   clientId = pMsg->head.clientId;
  STqGroup* pGroup = tqGetGroup(pTq, clientId);
  if (pGroup == NULL) {
L
Liu Jicong 已提交
455
    // client not connect
L
Liu Jicong 已提交
456 457
    return -1;
  }
L
Liu Jicong 已提交
458
  if (pMsg->acks.ackNum != 0) {
L
Liu Jicong 已提交
459
    if (tqAck(pGroup, &pMsg->acks) != 0) {
L
Liu Jicong 已提交
460
      // ack not success
L
Liu Jicong 已提交
461 462 463 464
      return -1;
    }
  }

L
Liu Jicong 已提交
465
  STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
L
Liu Jicong 已提交
466

L
Liu Jicong 已提交
467
  if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
468
    // fetch error
L
Liu Jicong 已提交
469 470 471
    return -1;
  }

L
Liu Jicong 已提交
472
  // judge and launch new query
L
Liu Jicong 已提交
473 474 475 476
  /*if (tqSendLaunchQuery(gHandle)) {*/
  // launch query error
  /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
477 478
  return 0;
}
L
Liu Jicong 已提交
479
#endif
L
Liu Jicong 已提交
480

L
Liu Jicong 已提交
481
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
L
Liu Jicong 已提交
482
  // calculate size
L
Liu Jicong 已提交
483
  int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
L
Liu Jicong 已提交
484
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
485
    void* tmpPtr = realloc(*ppHead, sz);
L
Liu Jicong 已提交
486
    if (tmpPtr == NULL) {
L
Liu Jicong 已提交
487
      free(*ppHead);
L
Liu Jicong 已提交
488
      // TODO: memory err
L
Liu Jicong 已提交
489 490 491 492
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
493
  }
L
Liu Jicong 已提交
494
  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
495
  // do serialization
L
Liu Jicong 已提交
496
  *(int64_t*)ptr = pGroup->clientId;
L
Liu Jicong 已提交
497
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
498
  *(int64_t*)ptr = pGroup->cgId;
L
Liu Jicong 已提交
499
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
500
  *(int32_t*)ptr = pGroup->topicNum;
501
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
502 503
  if (pGroup->topicNum > 0) {
    tqSerializeListHandle(pGroup->head, ptr);
L
Liu Jicong 已提交
504 505 506 507
  }
  return 0;
}

L
Liu Jicong 已提交
508 509
void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
  STqList* node = listHandle;
510
  ASSERT(node != NULL);
L
Liu Jicong 已提交
511
  while (node) {
L
Liu Jicong 已提交
512
    ptr = tqSerializeTopic(&node->topic, ptr);
L
Liu Jicong 已提交
513 514
    node = node->next;
  }
515
  return ptr;
L
Liu Jicong 已提交
516
}
517

L
Liu Jicong 已提交
518 519
void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
  *(int64_t*)ptr = pTopic->nextConsumeOffset;
L
Liu Jicong 已提交
520
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
521
  *(int64_t*)ptr = pTopic->topicId;
L
Liu Jicong 已提交
522
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
523 524 525 526
  /**(int32_t*)ptr = pTopic->head;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /**(int32_t*)ptr = pTopic->tail;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
527
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
528
    ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
L
Liu Jicong 已提交
529
  }
530
  return ptr;
L
Liu Jicong 已提交
531 532
}

L
Liu Jicong 已提交
533
void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
L
Liu Jicong 已提交
534 535
  // TODO: do we need serialize this?
  // mainly for executor
536
  return ptr;
L
Liu Jicong 已提交
537 538
}

L
Liu Jicong 已提交
539 540 541
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
  STqGroup*   gHandle = *ppGroup;
  const void* ptr = pHead->content;
L
Liu Jicong 已提交
542
  gHandle->clientId = *(int64_t*)ptr;
543 544 545 546 547 548 549
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
550
  STqList* node = gHandle->head;
L
Liu Jicong 已提交
551 552
  for (int i = 0; i < gHandle->topicNum; i++) {
    if (gHandle->head == NULL) {
L
Liu Jicong 已提交
553
      if ((node = malloc(sizeof(STqList))) == NULL) {
L
Liu Jicong 已提交
554
        // TODO: error
555 556
        return NULL;
      }
L
Liu Jicong 已提交
557
      node->next = NULL;
L
Liu Jicong 已提交
558
      ptr = tqDeserializeTopic(ptr, &node->topic);
559 560
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
561
      node->next = malloc(sizeof(STqList));
L
Liu Jicong 已提交
562 563
      if (node->next == NULL) {
        // TODO: error
564 565 566
        return NULL;
      }
      node->next->next = NULL;
L
Liu Jicong 已提交
567
      ptr = tqDeserializeTopic(ptr, &node->next->topic);
568 569 570 571
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
572
}
573

L
Liu Jicong 已提交
574
const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
575
  const void* ptr = pBytes;
L
Liu Jicong 已提交
576
  topic->nextConsumeOffset = *(int64_t*)ptr;
577
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
578
  topic->topicId = *(int64_t*)ptr;
579
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
L
Liu Jicong 已提交
580 581 582 583
  /*topic->head = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
  /*topic->tail = *(int32_t*)ptr;*/
  /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
L
Liu Jicong 已提交
584
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
L
Liu Jicong 已提交
585
    ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
586 587
  }
  return ptr;
L
Liu Jicong 已提交
588 589
}

L
Liu Jicong 已提交
590
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
L
Liu Jicong 已提交
591

L
Liu Jicong 已提交
592
// TODO: make this a macro
L
Liu Jicong 已提交
593
int tqGroupSSize(const STqGroup* gHandle) {
L
Liu Jicong 已提交
594 595
  return sizeof(int64_t) * 2  // cId + cgId
         + sizeof(int32_t)    // topicNum
L
Liu Jicong 已提交
596
         + gHandle->topicNum * tqTopicSSize();
L
Liu Jicong 已提交
597
}
598

L
Liu Jicong 已提交
599
// TODO: make this a macro
L
Liu Jicong 已提交
600
int tqTopicSSize() {
L
Liu Jicong 已提交
601 602
  return sizeof(int64_t) * 2    // nextConsumeOffset + topicId
         + sizeof(int32_t) * 2  // head + tail
L
Liu Jicong 已提交
603
         + TQ_BUFFER_SIZE * tqItemSSize();
L
Liu Jicong 已提交
604
}
605

L
Liu Jicong 已提交
606
int tqItemSSize() {
L
Liu Jicong 已提交
607 608
  // TODO: do this need serialization?
  // mainly for executor
L
Liu Jicong 已提交
609 610
  return 0;
}
611

L
Liu Jicong 已提交
612
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
L
Liu Jicong 已提交
613
  SMqConsumeReq* pReq = pMsg->pCont;
L
Liu Jicong 已提交
614
  SRpcMsg          rpcMsg;
L
Liu Jicong 已提交
615 616
  int64_t          reqId = pReq->reqId;
  int64_t          consumerId = pReq->consumerId;
L
Liu Jicong 已提交
617 618
  int64_t          reqOffset = pReq->offset;
  int64_t          fetchOffset = reqOffset;
L
Liu Jicong 已提交
619
  int64_t          blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
620

L
Liu Jicong 已提交
621 622
  int              rspLen = 0;

L
Liu Jicong 已提交
623
  STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
L
Liu Jicong 已提交
624
  int                sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
625

L
Liu Jicong 已提交
626 627
  for (int i = 0; i < sz; i++) {
    STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
L
Liu Jicong 已提交
628

L
Liu Jicong 已提交
629 630 631
    int8_t pos;
    int8_t skip = 0;
    SWalHead* pHead;
L
Liu Jicong 已提交
632
    while (1) {
L
Liu Jicong 已提交
633 634 635 636 637 638 639 640 641 642 643 644
      pos = fetchOffset % TQ_BUFFER_SIZE;
      skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
      if (skip == 1) {
        // do nothing
        break;
      }
      if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
        // check err
        atomic_store_8(&pTopic->buffer.output[pos].status, 0);
        skip = 1;
        break;
      }
L
Liu Jicong 已提交
645
      // read until find TDMT_VND_SUBMIT
L
Liu Jicong 已提交
646 647 648 649
      pHead = pTopic->pReadhandle->pHead;
      if (pHead->head.msgType == TDMT_VND_SUBMIT) {
        break;
      }
L
Liu Jicong 已提交
650
      if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
L
Liu Jicong 已提交
651 652 653
        atomic_store_8(&pTopic->buffer.output[pos].status, 0);
        skip = 1;
        break;
L
Liu Jicong 已提交
654
      }
L
Liu Jicong 已提交
655 656
      atomic_store_8(&pTopic->buffer.output[pos].status, 0);
      fetchOffset++;
L
Liu Jicong 已提交
657
    }
L
Liu Jicong 已提交
658
    if (skip == 1) continue;
L
Liu Jicong 已提交
659
    SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
L
Liu Jicong 已提交
660
    qTaskInfo_t task = pTopic->buffer.output[pos].task;
L
Liu Jicong 已提交
661

L
Liu Jicong 已提交
662
    qSetStreamInput(task, pCont);
L
Liu Jicong 已提交
663

L
Liu Jicong 已提交
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
    //SArray<SSDataBlock>
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    while (1) {
      SSDataBlock* pDataBlock;
      uint64_t     ts;
      if (qExecTask(task, &pDataBlock, &ts) < 0) {
        break;
      }
      if (pDataBlock != NULL) {
        taosArrayPush(pRes, pDataBlock);
      } else {
        break;
      }
    }

    atomic_store_8(&pTopic->buffer.output[pos].status, 0);

    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      fetchOffset++;
      continue;
685
    }
L
Liu Jicong 已提交
686 687 688

    pTopic->buffer.output[pos].dst = pRes;
    if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
L
Liu Jicong 已提交
689
      pTopic->buffer.firstOffset = pReq->offset;
L
Liu Jicong 已提交
690
    }
L
Liu Jicong 已提交
691
    if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
L
Liu Jicong 已提交
692
      pTopic->buffer.lastOffset = pReq->offset;
L
Liu Jicong 已提交
693
    }
L
Liu Jicong 已提交
694 695
    // put output into rsp
  }
L
Liu Jicong 已提交
696 697 698 699 700 701

  // launch query
  // get result
  return 0;
}

L
Liu Jicong 已提交
702 703 704 705 706
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
  STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
  if (pConsumer == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
707

L
Liu Jicong 已提交
708 709 710 711 712
  STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
  if (pTopic == NULL) {
    free(pConsumer);
    return -1;
  }
L
Liu Jicong 已提交
713 714
  strcpy(pTopic->topicName, pReq->topicName);
  strcpy(pTopic->cgroup, pReq->cgroup);
L
Liu Jicong 已提交
715 716 717
  strcpy(pTopic->sql, pReq->sql);
  strcpy(pTopic->logicalPlan, pReq->logicalPlan);
  strcpy(pTopic->physicalPlan, pReq->physicalPlan);
718

L
Liu Jicong 已提交
719 720
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
721 722 723
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
  }
L
Liu Jicong 已提交
724 725
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
726 727
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, pReadHandle);
L
Liu Jicong 已提交
728 729 730 731 732
  }
  // write mq meta
  return 0;
}

L
Liu Jicong 已提交
733
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
734 735 736 737 738
  STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
  if (pReadHandle == NULL) {
    return NULL;
  }
  pReadHandle->pMeta = pMeta;
L
Liu Jicong 已提交
739
  pReadHandle->pMsg = NULL;
740
  pReadHandle->ver = -1;
L
Liu Jicong 已提交
741
  pReadHandle->pColIdList = NULL;
742 743 744
  return NULL;
}

L
Liu Jicong 已提交
745 746 747 748 749 750 751
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
  pReadHandle->pMsg = pMsg;
  tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
  pReadHandle->ver = ver;
  memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
}

752
bool tqNextDataBlock(STqReadHandle* pHandle) {
L
Liu Jicong 已提交
753 754
  while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) {
    if (pHandle->tbUid == pHandle->pBlock->uid) return true;
755
  }
L
Liu Jicong 已提交
756
  return false;
757 758 759
}

int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
L
Liu Jicong 已提交
760 761 762
  /*int32_t         sversion = pHandle->pBlock->sversion;*/
  /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
  pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
763 764 765 766
  pBlockInfo->rows = pHandle->pBlock->numOfRows;
  pBlockInfo->uid = pHandle->pBlock->uid;
  return 0;
}
L
Liu Jicong 已提交
767

L
Liu Jicong 已提交
768
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
L
Liu Jicong 已提交
769
  int32_t         sversion = pHandle->pBlock->sversion;
770
  SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
L
Liu Jicong 已提交
771 772
  STSchema*       pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
  SArray*         pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
773 774 775 776
  if (pArray == NULL) {
    return NULL;
  }
  SColumnInfoData colInfo;
L
Liu Jicong 已提交
777
  int             sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
778 779 780 781 782 783 784 785 786
  colInfo.pData = malloc(sz);
  if (colInfo.pData == NULL) {
    return NULL;
  }

  SMemRow row;
  int32_t kvIdx;
  while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
    for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
L
Liu Jicong 已提交
787 788
      // TODO: filter out unused column
      STColumn* pCol = schemaColAt(pTschema, i);
789
      void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
L
Liu Jicong 已提交
790
      // TODO: handle varlen
791 792 793 794 795 796
      memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
    }
  }
  taosArrayPush(pArray, &colInfo);
  return pArray;
}