tq.c 19.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tcompare.h"
L
Liu Jicong 已提交
17
#include "tqInt.h"
L
Liu Jicong 已提交
18
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
19

L
Liu Jicong 已提交
20 21
void tqDebugShowSSData(SArray* dataBlocks);

L
Liu Jicong 已提交
22
int32_t tqInit() { return tqPushMgrInit(); }
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24
void tqCleanUp() { tqPushMgrCleanUp(); }
L
Liu Jicong 已提交
25

L
Liu Jicong 已提交
26 27
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig,
            SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
28
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
29
  if (pTq == NULL) {
L
Liu Jicong 已提交
30
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
31 32
    return NULL;
  }
H
Hongze Cheng 已提交
33
  pTq->path = strdup(path);
L
Liu Jicong 已提交
34
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
35
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
36
  pTq->pWal = pWal;
L
Liu Jicong 已提交
37
  pTq->pVnodeMeta = pVnodeMeta;
L
Liu Jicong 已提交
38
#if 0
L
Liu Jicong 已提交
39
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
40 41
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
42
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
43
  }
L
Liu Jicong 已提交
44
#endif
L
Liu Jicong 已提交
45 46
  pTq->tqMeta =
      tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
L
Liu Jicong 已提交
47
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
48
    free(pTq);
L
Liu Jicong 已提交
49 50 51
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
52 53
    return NULL;
  }
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55 56 57 58 59 60 61
  pTq->tqPushMgr = tqPushMgrOpen();
  if (pTq->tqPushMgr == NULL) {
    // free store
    free(pTq);
    return NULL;
  }

L
Liu Jicong 已提交
62 63
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
64 65
  return pTq;
}
L
Liu Jicong 已提交
66

L
Liu Jicong 已提交
67
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
68
  if (pTq) {
H
Hongze Cheng 已提交
69
    tfree(pTq->path);
H
Hongze Cheng 已提交
70 71
    free(pTq);
  }
L
Liu Jicong 已提交
72 73
  // TODO
}
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
L
Liu Jicong 已提交
76
  if (msgType != TDMT_VND_SUBMIT) return 0;
L
Liu Jicong 已提交
77 78 79
  void* data = malloc(msgLen);
  if (data == NULL) {
    return -1;
L
Liu Jicong 已提交
80
  }
L
Liu Jicong 已提交
81 82 83 84 85 86 87
  memcpy(data, msg, msgLen);
  SRpcMsg req = {
      .msgType = TDMT_VND_STREAM_TRIGGER,
      .pCont = data,
      .contLen = msgLen,
  };
  tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
L
Liu Jicong 已提交
88 89

#if 0
L
Liu Jicong 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
  while (pIter != NULL) {
    STqPusher* pusher = *(STqPusher**)pIter;
    if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
      STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
      // repack
      STqStreamToken* token = malloc(sizeof(STqStreamToken));
      if (token == NULL) {
        taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      token->type = TQ_STREAM_TOKEN__DATA;
      token->data = msg;
      // set input
      // exec
    }
    // send msg to ep
  }
L
Liu Jicong 已提交
109 110
  // iterate hash
  // process all msg
L
fix  
Liu Jicong 已提交
111 112
  // if waiting
  // memcpy and send msg to fetch thread
L
Liu Jicong 已提交
113 114 115 116
  // TODO: add reference
  // if handle waiting, launch query and response to consumer
  //
  // if no waiting handle, return
L
Liu Jicong 已提交
117
#endif
L
Liu Jicong 已提交
118 119 120
  return 0;
}

L
Liu Jicong 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }

int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
  return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
         strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}

int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
  int     num = taosArrayGetSize(pConsumer->topics);
  int32_t sz = 0;
  for (int i = 0; i < num; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    sz += tqGetTopicHandleSize(pTopic);
  }
  return sz;
}

static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopic->topicName);
  /*tlen += taosEncodeString(buf, pTopic->sql);*/
  /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
  /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
  tlen += taosEncodeString(buf, pTopic->qmsg);
L
Liu Jicong 已提交
145 146 147
  /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
L
Liu Jicong 已提交
148 149 150 151 152 153 154 155 156
  return tlen;
}

static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
  buf = taosDecodeStringTo(buf, pTopic->topicName);
  /*buf = taosDecodeString(buf, &pTopic->sql);*/
  /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
  /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
  buf = taosDecodeString(buf, &pTopic->qmsg);
L
Liu Jicong 已提交
157 158 159
  /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
L
Liu Jicong 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  return buf;
}

static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
  int32_t sz;

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
  tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  sz = taosArrayGetSize(pConsumer->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    tlen += tEncodeSTqTopic(buf, pTopic);
  }
  return tlen;
L
Liu Jicong 已提交
177 178
}

L
Liu Jicong 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
  buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
  if (pConsumer->topics == NULL) return NULL;
  for (int32_t i = 0; i < sz; i++) {
    STqTopic pTopic;
    buf = tDecodeSTqTopic(buf, &pTopic);
    taosArrayPush(pConsumer->topics, &pTopic);
  }
  return buf;
}

int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
  int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);

L
Liu Jicong 已提交
199
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
200
    void* tmpPtr = realloc(*ppHead, sizeof(STqSerializedHead) + sz);
L
Liu Jicong 已提交
201 202
    if (tmpPtr == NULL) {
      free(*ppHead);
L
Liu Jicong 已提交
203
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
204 205 206 207 208 209 210
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
  }

  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
211 212
  void* abuf = ptr;
  tEncodeSTqConsumer(&abuf, pConsumer);
L
Liu Jicong 已提交
213

L
Liu Jicong 已提交
214 215 216
  return 0;
}

L
Liu Jicong 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229
int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
  const void* str = pHead->content;
  *ppConsumer = calloc(1, sizeof(STqConsumer));
  if (*ppConsumer == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  STqConsumer* pConsumer = *ppConsumer;
  int32_t      sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
230
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
231 232 233 234 235
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
    if (pTopic->pReadhandle == NULL) {
      ASSERT(false);
    }
L
Liu Jicong 已提交
236 237
    for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
      pTopic->buffer.output[j].status = 0;
L
Liu Jicong 已提交
238
      STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
239 240 241 242
      SReadHandle    handle = {
             .reader = pReadHandle,
             .meta = pTq->pVnodeMeta,
      };
L
Liu Jicong 已提交
243 244
      pTopic->buffer.output[j].pReadHandle = pReadHandle;
      pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
L
Liu Jicong 已提交
245
    }
L
Liu Jicong 已提交
246
  }
L
Liu Jicong 已提交
247 248

  return 0;
L
Liu Jicong 已提交
249 250
}

L
Liu Jicong 已提交
251 252 253 254 255
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq* pReq = pMsg->pCont;
  int64_t     consumerId = pReq->consumerId;
  int64_t     fetchOffset;
  int64_t     blockingTime = pReq->blockingTime;
256

L
Liu Jicong 已提交
257 258 259 260 261 262 263 264
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
    fetchOffset = 0;
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
    fetchOffset = walGetLastVer(pTq->pWal);
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

L
Liu Jicong 已提交
265
  SMqPollRsp rsp = {
L
Liu Jicong 已提交
266
      /*.consumerId = consumerId,*/
L
Liu Jicong 已提交
267 268 269
      .numOfTopics = 0,
      .pBlockData = NULL,
  };
270 271 272 273 274 275 276 277 278

  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
  if (pConsumer == NULL) {
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
    rpcSendResponse(pMsg);
    return 0;
  }
L
Liu Jicong 已提交
279

280 281 282 283 284 285
  int sz = taosArrayGetSize(pConsumer->topics);
  ASSERT(sz == 1);
  STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
  ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
  ASSERT(pConsumer->consumerId == consumerId);

L
Liu Jicong 已提交
286
  rsp.reqOffset = pReq->currentOffset;
287 288 289 290
  rsp.skipLogNum = 0;

  SWalHead* pHead;
  while (1) {
L
Liu Jicong 已提交
291
    /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
292
    if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
L
Liu Jicong 已提交
293 294
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
295
      // response to user
296 297
      break;
    }
L
Liu Jicong 已提交
298
    int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
299 300
    pHead = pTopic->pReadhandle->pHead;
    if (pHead->head.msgType == TDMT_VND_SUBMIT) {
S
Shengliang Guan 已提交
301
      SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
302
      qTaskInfo_t task = pTopic->buffer.output[pos].task;
303
      qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
304 305 306 307 308 309 310 311 312
      SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
      while (1) {
        SSDataBlock* pDataBlock;
        uint64_t     ts;
        if (qExecTask(task, &pDataBlock, &ts) < 0) {
          ASSERT(false);
        }
        if (pDataBlock == NULL) {
          fetchOffset++;
L
Liu Jicong 已提交
313
          pos = fetchOffset % TQ_BUFFER_SIZE;
314 315 316 317 318
          rsp.skipLogNum++;
          break;
        }

        taosArrayPush(pRes, pDataBlock);
L
Liu Jicong 已提交
319
        rsp.schema = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
320 321 322 323 324
        rsp.rspOffset = fetchOffset;

        rsp.numOfTopics = 1;
        rsp.pBlockData = pRes;

L
Liu Jicong 已提交
325
        int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
326 327 328 329 330
        void*   buf = rpcMallocCont(tlen);
        if (buf == NULL) {
          pMsg->code = -1;
          return -1;
        }
L
fix  
Liu Jicong 已提交
331 332
        ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
        ((SMqRspHead*)buf)->epoch = pReq->epoch;
L
Liu Jicong 已提交
333
        ((SMqRspHead*)buf)->consumerId = consumerId;
334

L
fix  
Liu Jicong 已提交
335
        void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
336
        tEncodeSMqPollRsp(&abuf, &rsp);
L
Liu Jicong 已提交
337
        /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
338 339 340 341 342 343 344 345 346 347 348 349
        pMsg->pCont = buf;
        pMsg->contLen = tlen;
        pMsg->code = 0;
        rpcSendResponse(pMsg);
        return 0;
      }
    } else {
      fetchOffset++;
      rsp.skipLogNum++;
    }
  }

L
Liu Jicong 已提交
350 351 352 353
  /*if (blockingTime != 0) {*/
  /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
  /*} else {*/
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
354 355 356 357 358
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
359 360
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
361

L
Liu Jicong 已提交
362
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
363
  tEncodeSMqPollRsp(&abuf, &rsp);
364 365 366 367 368
  rsp.pBlockData = NULL;
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
  rpcSendResponse(pMsg);
L
Liu Jicong 已提交
369 370
  /*}*/

371 372 373
  return 0;
}

L
Liu Jicong 已提交
374 375 376 377
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
  SMqMVRebReq req = {0};
  tDecodeSMqMVRebReq(msg, &req);

L
Liu Jicong 已提交
378
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
L
Liu Jicong 已提交
379
  ASSERT(pConsumer);
L
Liu Jicong 已提交
380
  pConsumer->consumerId = req.newConsumerId;
L
Liu Jicong 已提交
381 382 383 384 385 386 387
  tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.newConsumerId);
  tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
  terrno = TSDB_CODE_SUCCESS;
  return 0;
}

L
Liu Jicong 已提交
388
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
L
Liu Jicong 已提交
389
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
390
  tDecodeSMqSetCVgReq(msg, &req);
L
Liu Jicong 已提交
391

392
  /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
L
Liu Jicong 已提交
393
  STqConsumer* pConsumer = calloc(1, sizeof(STqConsumer));
L
Liu Jicong 已提交
394
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
395 396
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
L
Liu Jicong 已提交
397
  }
L
Liu Jicong 已提交
398

L
Liu Jicong 已提交
399
  strcpy(pConsumer->cgroup, req.cgroup);
L
Liu Jicong 已提交
400
  pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
L
Liu Jicong 已提交
401
  pConsumer->consumerId = req.consumerId;
L
Liu Jicong 已提交
402
  pConsumer->epoch = 0;
L
Liu Jicong 已提交
403

L
Liu Jicong 已提交
404
  STqTopic* pTopic = calloc(1, sizeof(STqTopic));
L
Liu Jicong 已提交
405
  if (pTopic == NULL) {
L
Liu Jicong 已提交
406
    taosArrayDestroy(pConsumer->topics);
L
Liu Jicong 已提交
407 408 409
    free(pConsumer);
    return -1;
  }
L
Liu Jicong 已提交
410
  strcpy(pTopic->topicName, req.topicName);
L
Liu Jicong 已提交
411 412 413
  pTopic->sql = req.sql;
  pTopic->logicalPlan = req.logicalPlan;
  pTopic->physicalPlan = req.physicalPlan;
L
Liu Jicong 已提交
414
  pTopic->qmsg = req.qmsg;
L
Liu Jicong 已提交
415 416
  /*pTopic->committedOffset = -1;*/
  /*pTopic->currentOffset = -1;*/
417

L
Liu Jicong 已提交
418 419
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
420 421
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
L
Liu Jicong 已提交
422
    ASSERT(false);
L
Liu Jicong 已提交
423
  }
L
Liu Jicong 已提交
424 425
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
426
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
427 428 429 430
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
L
Liu Jicong 已提交
431
    pTopic->buffer.output[i].pReadHandle = pReadHandle;
L
Liu Jicong 已提交
432
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
L
Liu Jicong 已提交
433
  }
L
Liu Jicong 已提交
434
  taosArrayPush(pConsumer->topics, pTopic);
L
Liu Jicong 已提交
435 436
  tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
437
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
438 439
  return 0;
}
L
Liu Jicong 已提交
440

L
Liu Jicong 已提交
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
  ASSERT(parallel <= 8);
  pTask->numOfRunners = parallel;
  for (int32_t i = 0; i < parallel; i++) {
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
    pTask->runner[i].inputHandle = pReadHandle;
    pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, &handle);
  }
  return 0;
}

L
Liu Jicong 已提交
456 457 458 459 460 461 462
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
  SStreamTask* pTask = malloc(sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SCoder decoder;
  tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
L
Liu Jicong 已提交
463 464 465
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
L
Liu Jicong 已提交
466 467
  tCoderClear(&decoder);

L
Liu Jicong 已提交
468
  tqExpandTask(pTq, pTask, 8);
L
Liu Jicong 已提交
469 470 471 472
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));

  return 0;
}
L
Liu Jicong 已提交
473

L
Liu Jicong 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
static char* formatTimestamp(char* buf, int64_t val, int precision) {
  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /* comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
    */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

  struct tm* ptm = localtime(&tt);
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}
void tqDebugShowSSData(SArray* dataBlocks) {
  char    pBuf[128];
  int32_t sz = taosArrayGetSize(dataBlocks);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
    int32_t      colNum = pDataBlock->info.numOfCols;
    int32_t      rows = pDataBlock->info.rows;
    for (int32_t j = 0; j < rows; j++) {
      printf("|");
      for (int32_t k = 0; k < colNum; k++) {
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
            formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
            printf(" %25s |", pBuf);
            break;
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
L
Liu Jicong 已提交
542 543 544 545 546
            printf(" %15d |", *(int32_t*)var);
            break;
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
            printf(" %15ld |", *(int64_t*)var);
L
Liu Jicong 已提交
547 548 549 550 551 552 553 554
            break;
        }
      }
      printf("\n");
    }
  }
}

L
Liu Jicong 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
  void* pIter = NULL;

  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;
    if (!pTask->pipeSource) continue;

    int32_t workerId = 0;
    void*   exec = pTask->runner[workerId].executor;
    qSetStreamInput(exec, data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    while (1) {
      SSDataBlock* output;
      uint64_t     ts;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(false);
      }
      if (output == NULL) {
        break;
      }
      taosArrayPush(pRes, output);
    }
    if (pTask->pipeSink) {
      // write back
      /*printf("reach end\n");*/
      tqDebugShowSSData(pRes);
    } else {
      int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes);
      void*   buf = rpcMallocCont(tlen);
      if (buf == NULL) {
        return -1;
      }
      void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead));
      tEncodeDataBlocks(abuf, pRes);
      tmsg_t type;

      if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) {
        type = TDMT_VND_TASK_EXEC;
      } else {
        type = TDMT_SND_TASK_EXEC;
      }

      SRpcMsg reqMsg = {
          .pCont = buf,
          .contLen = tlen,
          .code = 0,
          .msgType = type,
      };
      tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &reqMsg);
    }
  }
  return 0;
}

L
Liu Jicong 已提交
611
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
L
Liu Jicong 已提交
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
  SStreamTaskExecReq* pReq = msg->pCont;

  int32_t taskId = pReq->head.streamTaskId;
  int32_t workerType = pReq->head.workerType;

  SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  // assume worker id is 1
  int32_t workerId = 1;
  void*   exec = pTask->runner[workerId].executor;
  int32_t sz = taosArrayGetSize(pReq->data);
  printf("input data:\n");
  tqDebugShowSSData(pReq->data);
  SArray* pRes = taosArrayInit(0, sizeof(void*));
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* input = taosArrayGet(pReq->data, i);
    SSDataBlock* output;
    uint64_t     ts;
    qSetStreamInput(exec, input, STREAM_DATA_TYPE_SSDATA_BLOCK);
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(0);
    }
    if (output == NULL) {
      break;
    }
    taosArrayPush(pRes, &output);
  }
  printf("output data:\n");
  tqDebugShowSSData(pRes);

L
Liu Jicong 已提交
641 642
  return 0;
}