tstream.c 20.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tstream.h"
#include "executor.h"

L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI8(buf, pOutput->type);
  tlen += taosEncodeFixedI32(buf, pOutput->sourceVg);
  tlen += taosEncodeFixedI64(buf, pOutput->sourceVer);
  ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK);
  tlen += tEncodeDataBlocks(buf, pOutput->blocks);
  return tlen;
}

void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
  buf = taosDecodeFixedI8(buf, &pInput->type);
  buf = taosDecodeFixedI32(buf, &pInput->sourceVg);
  buf = taosDecodeFixedI64(buf, &pInput->sourceVer);
  ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
  buf = tDecodeDataBlocks(buf, &pInput->blocks);
  return (void*)buf;
}

L
Liu Jicong 已提交
38
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
L
Liu Jicong 已提交
39 40 41 42 43 44 45 46 47 48 49
  SStreamTaskExecReq req = {
      .streamId = pTask->streamId,
      .data = data,
  };

  int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
  void*   buf = rpcMallocCont(tlen);

  if (buf == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51 52 53
  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
    ((SMsgHead*)buf)->vgId = 0;
    req.taskId = pTask->inplaceDispatcher.taskId;
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55 56 57 58
  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
    ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
    *ppEpSet = &pTask->fixedEpDispatcher.epSet;
    req.taskId = pTask->fixedEpDispatcher.taskId;
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
fix  
Liu Jicong 已提交
61
    // TODO use general name rule of schemaless
L
Liu Jicong 已提交
62
    char ctbName[TSDB_TABLE_FNAME_LEN + 22];
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70
    // all groupId must be the same in an array
    SSDataBlock* pBlock = taosArrayGet(data, 0);
    sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);

    // TODO: get hash function by hashMethod

    // get groupId, compute hash value
    uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
L
Liu Jicong 已提交
71

L
Liu Jicong 已提交
72 73 74 75
    // get node
    // TODO: optimize search process
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t sz = taosArrayGetSize(vgInfo);
L
Liu Jicong 已提交
76
    int32_t nodeId = 0;
L
Liu Jicong 已提交
77 78 79 80
    for (int32_t i = 0; i < sz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
        nodeId = pVgInfo->vgId;
L
Liu Jicong 已提交
81
        req.taskId = pVgInfo->taskId;
L
Liu Jicong 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
        *ppEpSet = &pVgInfo->epSet;
        break;
      }
    }
    ASSERT(nodeId != 0);
    ((SMsgHead*)buf)->vgId = htonl(nodeId);
  }

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSStreamTaskExecReq(&abuf, &req);

  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
  pMsg->msgType = pTask->dispatchMsgType;
S
Shengliang Guan 已提交
97
  pMsg->info.noResp = 1;
L
Liu Jicong 已提交
98 99 100 101 102 103 104 105 106

  return 0;
}

static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(data, pIter);
    if (pIter == NULL) return 0;
L
Liu Jicong 已提交
107
    SArray* pData = *(SArray**)pIter;
L
Liu Jicong 已提交
108 109
    SRpcMsg dispatchMsg = {0};
    SEpSet* pEpSet;
L
Liu Jicong 已提交
110
    if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
L
Liu Jicong 已提交
111 112 113
      ASSERT(0);
      return -1;
    }
114
    tmsgSendReq(pEpSet, &dispatchMsg);
L
Liu Jicong 已提交
115 116 117 118
  }
  return 0;
}

L
Liu Jicong 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input) {
  ASSERT(pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK);
  int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
  if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
    streamDataSubmitRefInc(input);
    taosWriteQitem(pTask->inputQ, input);
  }
  return inputStatus;
}

int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
  ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
  taosWriteQitem(pTask->inputQ, input);
  int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
  return inputStatus;
}

L
Liu Jicong 已提交
136
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
L
Liu Jicong 已提交
137
  void* exec = pTask->exec.executor;
L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154

  // set input
  if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
    ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT);

    qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
  } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) {
    SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
    ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);

    SArray* blocks = pBlock->blocks;
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
  }

  // exec
  while (1) {
L
Liu Jicong 已提交
155
    SSDataBlock* output = NULL;
L
Liu Jicong 已提交
156 157 158 159 160
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
    if (output == NULL) break;
L
Liu Jicong 已提交
161 162 163
    // TODO: do we need free memory?
    SSDataBlock* outputCopy = createOneDataBlock(output, true);
    taosArrayPush(pRes, outputCopy);
L
Liu Jicong 已提交
164 165 166 167 168 169 170
  }

  // destroy
  if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
    streamDataSubmitRefDec((SStreamDataSubmit*)data);
  } else {
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
L
Liu Jicong 已提交
171
    taosFreeQitem(data);
L
Liu Jicong 已提交
172 173 174 175 176
  }
  return 0;
}

// TODO: handle version
L
Liu Jicong 已提交
177
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
L
Liu Jicong 已提交
178 179 180 181
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  if (pRes == NULL) return -1;
  while (1) {
    int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
L
Liu Jicong 已提交
182
    void*  exec = pTask->exec.executor;
L
Liu Jicong 已提交
183 184 185 186 187 188 189 190 191
    if (execStatus == TASK_STATUS__IDLE) {
      // first run, from qall, handle failure from last exec
      while (1) {
        void* data = NULL;
        taosGetQitem(pTask->inputQAll, &data);
        if (data == NULL) break;

        streamTaskExecImpl(pTask, data, pRes);

L
Liu Jicong 已提交
192
        /*taosFreeQitem(data);*/
L
Liu Jicong 已提交
193 194

        if (taosArrayGetSize(pRes) != 0) {
L
Liu Jicong 已提交
195
          SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
          resQ->type = STREAM_INPUT__DATA_BLOCK;
          resQ->blocks = pRes;
          taosWriteQitem(pTask->outputQ, resQ);
          pRes = taosArrayInit(0, sizeof(SSDataBlock));
          if (pRes == NULL) goto FAIL;
        }
      }
      // second run, from inputQ
      taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
      while (1) {
        void* data = NULL;
        taosGetQitem(pTask->inputQAll, &data);
        if (data == NULL) break;

        streamTaskExecImpl(pTask, data, pRes);

L
Liu Jicong 已提交
212
        /*taosFreeQitem(data);*/
L
Liu Jicong 已提交
213 214

        if (taosArrayGetSize(pRes) != 0) {
L
Liu Jicong 已提交
215
          SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
          resQ->type = STREAM_INPUT__DATA_BLOCK;
          resQ->blocks = pRes;
          taosWriteQitem(pTask->outputQ, resQ);
          pRes = taosArrayInit(0, sizeof(SSDataBlock));
          if (pRes == NULL) goto FAIL;
        }
      }
      // set status closing
      atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
      // third run, make sure all inputQ is cleared
      taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
      while (1) {
        void* data = NULL;
        taosGetQitem(pTask->inputQAll, &data);
        if (data == NULL) break;

        streamTaskExecImpl(pTask, data, pRes);

L
Liu Jicong 已提交
234
        /*taosFreeQitem(data);*/
L
Liu Jicong 已提交
235 236

        if (taosArrayGetSize(pRes) != 0) {
L
Liu Jicong 已提交
237
          SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
          resQ->type = STREAM_INPUT__DATA_BLOCK;
          resQ->blocks = pRes;
          taosWriteQitem(pTask->outputQ, resQ);
          pRes = taosArrayInit(0, sizeof(SSDataBlock));
          if (pRes == NULL) goto FAIL;
        }
      }
      // set status closing
      atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
      // third run, make sure all inputQ is cleared
      taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
      while (1) {
        void* data = NULL;
        taosGetQitem(pTask->inputQAll, &data);
        if (data == NULL) break;
L
Liu Jicong 已提交
253 254 255 256 257 258

        streamTaskExecImpl(pTask, data, pRes);

        taosFreeQitem(data);

        if (taosArrayGetSize(pRes) != 0) {
L
Liu Jicong 已提交
259
          SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
260 261 262 263 264 265
          resQ->type = STREAM_INPUT__DATA_BLOCK;
          resQ->blocks = pRes;
          taosWriteQitem(pTask->outputQ, resQ);
          pRes = taosArrayInit(0, sizeof(SSDataBlock));
          if (pRes == NULL) goto FAIL;
        }
L
Liu Jicong 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
      }

      atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
      break;
    } else if (execStatus == TASK_STATUS__CLOSING) {
      continue;
    } else if (execStatus == TASK_STATUS__EXECUTING) {
      break;
    } else {
      ASSERT(0);
    }
  }
  return 0;
FAIL:
  atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
  return -1;
}

L
Liu Jicong 已提交
284
int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
L
Liu Jicong 已提交
285
  bool firstRun = 1;
L
Liu Jicong 已提交
286
  while (1) {
L
Liu Jicong 已提交
287 288 289 290 291 292 293 294 295 296 297
    SStreamDataBlock* pBlock = NULL;
    if (!firstRun) {
      taosReadAllQitems(pTask->outputQ, pTask->outputQAll);
    }
    taosGetQitem(pTask->outputQAll, (void**)&pBlock);
    if (pBlock == NULL) {
      if (firstRun) {
        firstRun = 0;
        continue;
      } else {
        break;
L
Liu Jicong 已提交
298 299 300
      }
    }

L
Liu Jicong 已提交
301 302 303
    SArray* pRes = pBlock->blocks;

    // sink
L
Liu Jicong 已提交
304
    if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
305 306
      // blockDebugShowData(pRes);
      pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
L
Liu Jicong 已提交
307 308
    } else if (pTask->sinkType == TASK_SINK__SMA) {
      pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
L
Liu Jicong 已提交
309 310 311
      //
    } else if (pTask->sinkType == TASK_SINK__FETCH) {
      //
L
Liu Jicong 已提交
312
    } else {
L
Liu Jicong 已提交
313
      ASSERT(pTask->sinkType == TASK_SINK__NONE);
L
Liu Jicong 已提交
314 315 316
    }

    // dispatch
L
Liu Jicong 已提交
317 318 319 320 321 322 323 324 325
    // TODO dispatch guard
    int8_t outputStatus = atomic_load_8(&pTask->outputStatus);
    if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) {
      if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
        SRpcMsg dispatchMsg = {0};
        if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
          ASSERT(0);
          return -1;
        }
L
Liu Jicong 已提交
326

L
Liu Jicong 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
        int32_t qType;
        if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
          qType = FETCH_QUEUE;
        } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
                   pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
          qType = MERGE_QUEUE;
        } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
          qType = WRITE_QUEUE;
        } else {
          ASSERT(0);
        }
        tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);

      } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
        SRpcMsg dispatchMsg = {0};
        SEpSet* pEpSet = NULL;
        if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
          ASSERT(0);
          return -1;
        }
L
Liu Jicong 已提交
347

L
Liu Jicong 已提交
348
        tmsgSendReq(pEpSet, &dispatchMsg);
L
Liu Jicong 已提交
349

L
Liu Jicong 已提交
350 351 352 353 354
      } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
        SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
        if (pShuffleRes == NULL) {
          return -1;
        }
L
Liu Jicong 已提交
355

L
Liu Jicong 已提交
356 357 358 359
        int32_t sz = taosArrayGetSize(pRes);
        for (int32_t i = 0; i < sz; i++) {
          SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
          SArray*      pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
L
Liu Jicong 已提交
360
          if (pArray == NULL) {
L
Liu Jicong 已提交
361 362 363 364 365
            pArray = taosArrayInit(0, sizeof(SSDataBlock));
            if (pArray == NULL) {
              return -1;
            }
            taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
L
Liu Jicong 已提交
366
          }
L
Liu Jicong 已提交
367
          taosArrayPush(pArray, pDataBlock);
L
Liu Jicong 已提交
368 369
        }

L
Liu Jicong 已提交
370 371 372
        if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
          return -1;
        }
L
Liu Jicong 已提交
373

L
Liu Jicong 已提交
374 375 376
      } else {
        ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
      }
L
Liu Jicong 已提交
377 378 379 380 381
    }
  }
  return 0;
}

L
Liu Jicong 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
  SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
  int8_t            status;

  // 1.1 update status
  // TODO cal backpressure
  if (pBlock == NULL) {
    atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
    status = TASK_INPUT_STATUS__FAILED;
  } else {
    status = atomic_load_8(&pTask->inputStatus);
  }

  // 1.2 enqueue
  pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
  pBlock->sourceVg = pReq->sourceVg;
L
Liu Jicong 已提交
398
  /*pBlock->sourceVer = pReq->sourceVer;*/
L
Liu Jicong 已提交
399 400 401 402 403
  taosWriteQitem(pTask->inputQ, pBlock);

  // 1.3 rsp by input status
  SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
  pCont->inputStatus = status;
L
Liu Jicong 已提交
404 405
  pCont->streamId = pReq->streamId;
  pCont->taskId = pReq->sourceTaskId;
L
Liu Jicong 已提交
406 407 408
  pRsp->pCont = pCont;
  pRsp->contLen = sizeof(SStreamDispatchRsp);
  tmsgSendRsp(pRsp);
L
Liu Jicong 已提交
409 410 411 412

  return 0;
}

L
Liu Jicong 已提交
413
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
L
Liu Jicong 已提交
414 415
  // 1. handle input
  streamTaskEnqueue(pTask, pReq, pRsp);
L
Liu Jicong 已提交
416

L
Liu Jicong 已提交
417 418 419 420
  // 2. try exec
  // 2.1. idle: exec
  // 2.2. executing: return
  // 2.3. closing: keep trying
L
Liu Jicong 已提交
421
  streamExec(pTask, pMsgCb);
L
Liu Jicong 已提交
422

L
Liu Jicong 已提交
423 424 425
  // 3. handle output
  // 3.1 check and set status
  // 3.2 dispatch / sink
L
Liu Jicong 已提交
426
  streamSink(pTask, pMsgCb);
L
Liu Jicong 已提交
427 428 429 430

  return 0;
}

L
Liu Jicong 已提交
431
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
L
Liu Jicong 已提交
432 433 434 435 436
  atomic_store_8(&pTask->inputStatus, pRsp->inputStatus);
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    // TODO: init recover timer
  }
  // continue dispatch
L
Liu Jicong 已提交
437
  streamSink(pTask, pMsgCb);
L
Liu Jicong 已提交
438 439 440 441
  return 0;
}

int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
L
Liu Jicong 已提交
442 443
  streamExec(pTask, pMsgCb);
  streamSink(pTask, pMsgCb);
L
Liu Jicong 已提交
444 445 446
  return 0;
}

L
Liu Jicong 已提交
447
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
448
  //
L
Liu Jicong 已提交
449 450 451
  return 0;
}

L
Liu Jicong 已提交
452
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
L
Liu Jicong 已提交
453 454 455 456
  //
  return 0;
}

L
Liu Jicong 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pReq->streamId);
  tlen += taosEncodeFixedI32(buf, pReq->taskId);
  tlen += tEncodeDataBlocks(buf, pReq->data);
  return tlen;
}

void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->streamId);
  buf = taosDecodeFixedI32(buf, &pReq->taskId);
  buf = tDecodeDataBlocks(buf, &pReq->data);
  return (void*)buf;
}

void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); }
L
Liu Jicong 已提交
473 474

SStreamTask* tNewSStreamTask(int64_t streamId) {
wafwerar's avatar
wafwerar 已提交
475
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
L
Liu Jicong 已提交
476 477 478 479 480
  if (pTask == NULL) {
    return NULL;
  }
  pTask->taskId = tGenIdPI32();
  pTask->streamId = streamId;
L
Liu Jicong 已提交
481 482
  pTask->status = TASK_STATUS__IDLE;

L
Liu Jicong 已提交
483 484 485
  return pTask;
}

H
Hongze Cheng 已提交
486
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
L
Liu Jicong 已提交
487 488 489
  /*if (tStartEncode(pEncoder) < 0) return -1;*/
  if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
L
Liu Jicong 已提交
490
  if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
L
Liu Jicong 已提交
491 492 493 494 495 496 497
  if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
  if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;

L
Liu Jicong 已提交
498 499 500 501
  if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
  if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;

  if (pTask->execType != TASK_EXEC__NONE) {
L
Liu Jicong 已提交
502 503 504 505
    if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
    if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
  }

L
Liu Jicong 已提交
506
  if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
507
    if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
L
Liu Jicong 已提交
508
    if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
509
    if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
L
Liu Jicong 已提交
510
  } else if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
511
    if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
L
Liu Jicong 已提交
512 513 514 515
  } else if (pTask->sinkType == TASK_SINK__FETCH) {
    if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
  } else {
    ASSERT(pTask->sinkType == TASK_SINK__NONE);
L
Liu Jicong 已提交
516 517 518
  }

  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
L
Liu Jicong 已提交
519
    if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
520
  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
L
Liu Jicong 已提交
521
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
522 523 524
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
Liu Jicong 已提交
525 526
    if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
    /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
L
Liu Jicong 已提交
527 528 529 530 531 532
  }

  /*tEndEncode(pEncoder);*/
  return pEncoder->pos;
}

H
Hongze Cheng 已提交
533
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
L
Liu Jicong 已提交
534 535 536
  /*if (tStartDecode(pDecoder) < 0) return -1;*/
  if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
L
Liu Jicong 已提交
537
  if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
L
Liu Jicong 已提交
538 539 540 541 542 543 544
  if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
  if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;

L
Liu Jicong 已提交
545 546 547 548
  if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
  if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;

  if (pTask->execType != TASK_EXEC__NONE) {
L
Liu Jicong 已提交
549 550 551 552
    if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
    if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
  }

L
Liu Jicong 已提交
553
  if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
554
    if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
L
Liu Jicong 已提交
555
    if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
556 557 558
    pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
    if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
    if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
L
Liu Jicong 已提交
559
  } else if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
560
    if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
L
Liu Jicong 已提交
561 562 563 564
  } else if (pTask->sinkType == TASK_SINK__FETCH) {
    if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
  } else {
    ASSERT(pTask->sinkType == TASK_SINK__NONE);
L
Liu Jicong 已提交
565 566 567
  }

  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
L
Liu Jicong 已提交
568
    if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
569
  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
L
Liu Jicong 已提交
570
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
571 572 573
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
Liu Jicong 已提交
574 575
    /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
    if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
576 577 578 579 580 581 582
  }

  /*tEndDecode(pDecoder);*/
  return 0;
}

void tFreeSStreamTask(SStreamTask* pTask) {
L
Liu Jicong 已提交
583 584
  taosCloseQueue(pTask->inputQ);
  taosCloseQueue(pTask->outputQ);
L
Liu Jicong 已提交
585
  // TODO
L
Liu Jicong 已提交
586
  if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
L
Liu Jicong 已提交
587
  qDestroyTask(pTask->exec.executor);
L
Liu Jicong 已提交
588
  taosMemoryFree(pTask);
L
Liu Jicong 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
}

#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
  /*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
  return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
  return pEncoder->size;
}
void    tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
  taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif