tstream.h 15.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
L
Liu Jicong 已提交
17
#include "os.h"
L
Liu Jicong 已提交
18
#include "query.h"
19
#include "streamState.h"
L
Liu Jicong 已提交
20
#include "tdatablock.h"
L
Liu Jicong 已提交
21
#include "tdbInt.h"
L
Liu Jicong 已提交
22 23
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
24
#include "tqueue.h"
L
Liu Jicong 已提交
25 26 27 28 29 30
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
31 32
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34 35
typedef struct SStreamTask SStreamTask;

36 37
enum {
  STREAM_STATUS__NORMAL = 0,
L
Liu Jicong 已提交
38
  STREAM_STATUS__STOP,
L
Liu Jicong 已提交
39
  STREAM_STATUS__INIT,
L
Liu Jicong 已提交
40
  STREAM_STATUS__FAILED,
41 42 43
  STREAM_STATUS__RECOVER,
};

L
Liu Jicong 已提交
44
enum {
L
Liu Jicong 已提交
45 46
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
L
Liu Jicong 已提交
47 48
  TASK_STATUS__FAIL,
  TASK_STATUS__STOP,
49
  TASK_STATUS__WAIT_DOWNSTREAM,
50 51 52
  TASK_STATUS__RECOVER_PREPARE,
  TASK_STATUS__RECOVER1,
  TASK_STATUS__RECOVER2,
L
Liu Jicong 已提交
53 54 55
};

enum {
L
Liu Jicong 已提交
56 57 58 59
  TASK_SCHED_STATUS__INACTIVE = 1,
  TASK_SCHED_STATUS__WAITING,
  TASK_SCHED_STATUS__ACTIVE,
  TASK_SCHED_STATUS__FAILED,
L
Liu Jicong 已提交
60
  TASK_SCHED_STATUS__DROPPING,
L
Liu Jicong 已提交
61 62 63 64 65 66 67
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
68
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
69 70 71 72 73 74
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
75 76
};

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
enum {
  TASK_TRIGGER_STATUS__INACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

enum {
  TASK_LEVEL__SOURCE = 1,
  TASK_LEVEL__AGG,
  TASK_LEVEL__SINK,
};

enum {
  TASK_OUTPUT__FIXED_DISPATCH = 1,
  TASK_OUTPUT__SHUFFLE_DISPATCH,
  TASK_OUTPUT__TABLE,
  TASK_OUTPUT__SMA,
  TASK_OUTPUT__FETCH,
};

L
Liu Jicong 已提交
96 97 98 99 100 101
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
102 103 104 105
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
106
typedef struct {
L
Liu Jicong 已提交
107 108
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
109 110 111 112
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

113 114 115 116 117 118 119
typedef struct {
  int8_t  type;
  int64_t ver;
  SArray* dataRefs;  // SArray<int32_t*>
  SArray* reqs;      // SArray<SSubmitReq*>
} SStreamMergedSubmit;

L
Liu Jicong 已提交
120 121 122
typedef struct {
  int8_t type;

L
Liu Jicong 已提交
123
  int32_t srcVgId;
L
Liu Jicong 已提交
124
  int32_t childId;
L
Liu Jicong 已提交
125
  int64_t sourceVer;
L
Liu Jicong 已提交
126
  int64_t reqId;
L
Liu Jicong 已提交
127

L
Liu Jicong 已提交
128
  SArray* blocks;  // SArray<SSDataBlock>
L
Liu Jicong 已提交
129 130
} SStreamDataBlock;

L
Liu Jicong 已提交
131 132 133 134 135 136 137 138
// ref data block, for delete
typedef struct {
  int8_t       type;
  int64_t      ver;
  int32_t*     dataRef;
  SSDataBlock* pBlock;
} SStreamRefDataBlock;

L
Liu Jicong 已提交
139 140 141 142
typedef struct {
  int8_t type;
} SStreamCheckpoint;

L
Liu Jicong 已提交
143 144 145 146
typedef struct {
  int8_t type;
} SStreamTaskDestroy;

147 148 149 150 151
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
152 153 154 155
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
156 157
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
158

159 160 161
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
162 163 164 165 166 167 168
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
169 170
}

L
Liu Jicong 已提交
171 172 173 174 175
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

L
Liu Jicong 已提交
176 177 178 179
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
  //
  return queue->qItem;
}
L
Liu Jicong 已提交
180 181 182 183

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
184
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
185
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
186
  } else {
187
    queue->qItem = NULL;
L
Liu Jicong 已提交
188 189 190 191 192
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
193
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
194 195 196
  }
}

L
Liu Jicong 已提交
197
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
198

L
Liu Jicong 已提交
199
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201 202
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
203
typedef struct {
L
Liu Jicong 已提交
204
  char* qmsg;
L
Liu Jicong 已提交
205
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
206
  void* executor;
L
Liu Jicong 已提交
207 208 209
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
210
  int32_t taskId;
L
Liu Jicong 已提交
211 212 213 214 215
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
216
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
217
  int32_t   waitingRspCnt;
L
Liu Jicong 已提交
218
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
219 220
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
221
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
222

L
Liu Jicong 已提交
223
typedef struct {
L
Liu Jicong 已提交
224
  int64_t         stbUid;
L
Liu Jicong 已提交
225
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
226
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
227
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
228 229
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
230
  STSchema* pTSchema;
L
Liu Jicong 已提交
231 232
} STaskSinkTb;

L
Liu Jicong 已提交
233
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
234

L
Liu Jicong 已提交
235
typedef struct {
L
Liu Jicong 已提交
236 237
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
238
  void*     vnode;
L
Liu Jicong 已提交
239
  FSmaSink* smaSink;
L
Liu Jicong 已提交
240 241 242 243 244 245
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

L
Liu Jicong 已提交
246 247 248 249
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
L
Liu Jicong 已提交
250
  SEpSet  epSet;
L
Liu Jicong 已提交
251 252
} SStreamChildEpInfo;

L
Liu Jicong 已提交
253
typedef struct {
L
Liu Jicong 已提交
254 255
  int32_t srcNodeId;
  int32_t srcChildId;
L
Liu Jicong 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  int64_t stateSaveVer;
  int64_t stateProcessedVer;
} SStreamCheckpointInfo;

typedef struct {
  int64_t streamId;
  int64_t checkTs;
  int32_t checkpointId;  // incremental
  int32_t taskId;
  SArray* checkpointVer;  // SArray<SStreamCheckpointInfo>
} SStreamMultiVgCheckpointInfo;

typedef struct {
  int32_t taskId;
  int32_t checkpointId;  // incremental
} SStreamCheckpointKey;

typedef struct {
  int32_t taskId;
  SArray* checkpointVer;
} SStreamRecoveringState;

278
typedef struct SStreamTask {
L
Liu Jicong 已提交
279 280
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
281
  int32_t totalLevel;
282 283
  int8_t  taskLevel;
  int8_t  outputType;
L
Liu Jicong 已提交
284 285
  int16_t dispatchMsgType;

286
  int8_t taskStatus;
L
Liu Jicong 已提交
287
  int8_t schedStatus;
L
Liu Jicong 已提交
288

L
Liu Jicong 已提交
289
  // node info
L
Liu Jicong 已提交
290
  int32_t selfChildId;
L
Liu Jicong 已提交
291 292 293
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
294 295 296
  int64_t recoverSnapVer;
  int64_t startVer;

L
Liu Jicong 已提交
297 298 299
  // fill history
  int8_t fillHistory;

L
Liu Jicong 已提交
300 301
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>
L
Liu Jicong 已提交
302 303
  int32_t nextCheckId;
  SArray* checkpointInfo;  // SArray<SStreamCheckpointInfo>
L
Liu Jicong 已提交
304

L
Liu Jicong 已提交
305 306 307
  // exec
  STaskExec exec;

308
  // output
L
Liu Jicong 已提交
309 310 311
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
312 313 314
    STaskSinkTb            tbSink;
    STaskSinkSma           smaSink;
    STaskSinkFetch         fetchSink;
L
Liu Jicong 已提交
315 316
  };

L
Liu Jicong 已提交
317 318
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
319

320 321
  // STaosQueue*   inputQueue1;
  // STaosQall*    inputQall;
L
Liu Jicong 已提交
322 323
  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
324

325 326 327 328 329 330 331
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

  // msg handle
  SMsgCb* pMsgCb;
332 333 334 335

  // state backend
  SStreamState* pState;

336
  // do not serialize
337 338 339 340
  int32_t recoverTryingDownstream;
  int32_t recoverWaitingUpstream;
  int64_t checkReqId;
  SArray* checkReqIds;  // shuffle
341

342
} SStreamTask;
L
Liu Jicong 已提交
343

L
Liu Jicong 已提交
344 345 346
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
347
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
348 349
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
350 351
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
352 353 354 355
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
L
Liu Jicong 已提交
356 357
      qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
358 359 360
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
L
Liu Jicong 已提交
361
    qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
L
Liu Jicong 已提交
362
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
L
Liu Jicong 已提交
363
    // qStreamInput(pTask->exec.executor, pSubmitClone);
L
Liu Jicong 已提交
364 365
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE ||
             pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
L
Liu Jicong 已提交
366
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
367
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
368 369
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
370
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
371
  } else if (pItem->type == STREAM_INPUT__GET_RES) {
372
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
373
    // qStreamInput(pTask->exec.executor, pItem);
374 375
  }

L
Liu Jicong 已提交
376
  if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
377
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
378 379
  }

380
#if 0
L
Liu Jicong 已提交
381 382
  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
383
#endif
L
Liu Jicong 已提交
384 385 386 387 388 389 390 391
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
392
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
393
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
394
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
395
    taosFreeQitem(pBlock);
396
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
397
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
398
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
399
    taosFreeQitem(pBlock);
400 401 402
  } else {
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
403 404
  return 0;
}
L
Liu Jicong 已提交
405

L
Liu Jicong 已提交
406 407 408 409 410 411
typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
412 413 414
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
415 416 417
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
418
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
419 420 421
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
422 423 424 425
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
426 427 428 429
  int32_t upstreamNodeId;
  int32_t upstreamTaskId;
  int32_t downstreamNodeId;
  int32_t downstreamTaskId;
L
Liu Jicong 已提交
430 431 432
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
433 434
typedef struct {
  int64_t            streamId;
L
Liu Jicong 已提交
435
  int64_t            reqId;
L
Liu Jicong 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

451
typedef struct {
452
  int64_t reqId;
453
  int64_t streamId;
454 455 456 457
  int32_t upstreamNodeId;
  int32_t upstreamTaskId;
  int32_t downstreamNodeId;
  int32_t downstreamTaskId;
458
  int32_t childId;
459
} SStreamTaskCheckReq;
460

L
Liu Jicong 已提交
461
typedef struct {
462
  int64_t reqId;
L
Liu Jicong 已提交
463
  int64_t streamId;
L
Liu Jicong 已提交
464
  int32_t upstreamNodeId;
465 466 467 468 469 470
  int32_t upstreamTaskId;
  int32_t downstreamNodeId;
  int32_t downstreamTaskId;
  int32_t childId;
  int8_t  status;
} SStreamTaskCheckRsp;
L
Liu Jicong 已提交
471 472

typedef struct {
473 474 475 476
  SMsgHead msgHead;
  int64_t  streamId;
  int32_t  taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
L
Liu Jicong 已提交
477 478 479 480

typedef struct {
  int64_t streamId;
  int32_t taskId;
481 482
  int32_t childId;
} SStreamRecoverFinishReq;
L
Liu Jicong 已提交
483

484 485
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
L
Liu Jicong 已提交
486

L
Liu Jicong 已提交
487 488
typedef struct {
  int64_t streamId;
L
Liu Jicong 已提交
489 490 491
  int32_t downstreamTaskId;
  int32_t taskId;
} SStreamRecoverDownstreamReq;
L
Liu Jicong 已提交
492 493

typedef struct {
L
Liu Jicong 已提交
494 495 496 497 498 499
  int64_t streamId;
  int32_t downstreamTaskId;
  int32_t taskId;
  SArray* checkpointVer;  // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;

500 501 502 503 504 505
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);

int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);

L
Liu Jicong 已提交
506
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
L
Liu Jicong 已提交
507 508 509 510 511
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);

int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);

512
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
513
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
L
Liu Jicong 已提交
514 515 516
void    tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);

void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
517

518
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
519

L
Liu Jicong 已提交
520
int32_t streamProcessRunReq(SStreamTask* pTask);
L
Liu Jicong 已提交
521
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
522
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
L
Liu Jicong 已提交
523

L
Liu Jicong 已提交
524 525 526
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
527 528 529
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);

530 531 532
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);

// recover and fill history
533 534 535 536
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
// agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);

// expand and deploy
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);

// meta
L
Liu Jicong 已提交
557 558 559 560
typedef struct SStreamMeta {
  char*        path;
  TDB*         db;
  TTB*         pTaskDb;
561
  TTB*         pCheckpointDb;
L
Liu Jicong 已提交
562
  SHashObj*    pTasks;
L
Liu Jicong 已提交
563
  SHashObj*    pRecoverStatus;
L
Liu Jicong 已提交
564 565 566
  void*        ahandle;
  TXN          txn;
  FTaskExpand* expandFunc;
567
  int32_t      vgId;
L
Liu Jicong 已提交
568
} SStreamMeta;
L
Liu Jicong 已提交
569

570
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
L
Liu Jicong 已提交
571 572
void         streamMetaClose(SStreamMeta* streamMeta);

573
int32_t      streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
574
int32_t      streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
575 576
int32_t      streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
L
Liu Jicong 已提交
577 578 579 580

int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
L
Liu Jicong 已提交
581
int32_t streamLoadTasks(SStreamMeta* pMeta);
L
Liu Jicong 已提交
582

L
Liu Jicong 已提交
583 584 585 586
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
587
#endif /* ifndef _STREAM_H_ */