tstream.h 15.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
L
Liu Jicong 已提交
17
#include "os.h"
L
Liu Jicong 已提交
18
#include "query.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tdbInt.h"
L
Liu Jicong 已提交
21 22
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
23
#include "tqueue.h"
L
Liu Jicong 已提交
24 25 26 27 28 29
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
30 31
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
32

L
Liu Jicong 已提交
33 34
typedef struct SStreamTask SStreamTask;

35 36
enum {
  STREAM_STATUS__NORMAL = 0,
L
Liu Jicong 已提交
37 38
  STREAM_STATUS__STOP,
  STREAM_STATUS__FAILED,
39 40 41
  STREAM_STATUS__RECOVER,
};

L
Liu Jicong 已提交
42
enum {
L
Liu Jicong 已提交
43 44
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
L
Liu Jicong 已提交
45 46
  TASK_STATUS__FAIL,
  TASK_STATUS__STOP,
L
Liu Jicong 已提交
47 48
  TASK_STATUS__RECOVER_DOWNSTREAM,
  TASK_STATUS__RECOVER_SELF,
L
Liu Jicong 已提交
49 50 51
};

enum {
L
Liu Jicong 已提交
52 53 54 55
  TASK_SCHED_STATUS__INACTIVE = 1,
  TASK_SCHED_STATUS__WAITING,
  TASK_SCHED_STATUS__ACTIVE,
  TASK_SCHED_STATUS__FAILED,
L
Liu Jicong 已提交
56
  TASK_SCHED_STATUS__DROPPING,
L
Liu Jicong 已提交
57 58 59 60 61 62 63
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
64
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
65 66 67 68 69 70
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
71 72
};

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
enum {
  TASK_TRIGGER_STATUS__INACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

enum {
  TASK_LEVEL__SOURCE = 1,
  TASK_LEVEL__AGG,
  TASK_LEVEL__SINK,
};

enum {
  TASK_OUTPUT__FIXED_DISPATCH = 1,
  TASK_OUTPUT__SHUFFLE_DISPATCH,
  TASK_OUTPUT__TABLE,
  TASK_OUTPUT__SMA,
  TASK_OUTPUT__FETCH,
};

L
Liu Jicong 已提交
92 93 94 95 96 97
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
98 99 100 101
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
102
typedef struct {
L
Liu Jicong 已提交
103 104
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
105 106 107 108
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

109 110 111 112 113 114 115
typedef struct {
  int8_t  type;
  int64_t ver;
  SArray* dataRefs;  // SArray<int32_t*>
  SArray* reqs;      // SArray<SSubmitReq*>
} SStreamMergedSubmit;

L
Liu Jicong 已提交
116 117 118
typedef struct {
  int8_t type;

L
Liu Jicong 已提交
119
  int32_t srcVgId;
L
Liu Jicong 已提交
120
  int32_t childId;
L
Liu Jicong 已提交
121
  int64_t sourceVer;
L
Liu Jicong 已提交
122
  int64_t reqId;
L
Liu Jicong 已提交
123

L
Liu Jicong 已提交
124
  SArray* blocks;  // SArray<SSDataBlock>
L
Liu Jicong 已提交
125 126 127 128 129 130
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

L
Liu Jicong 已提交
131 132 133 134
typedef struct {
  int8_t type;
} SStreamTaskDestroy;

135 136 137 138 139
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
140 141 142 143
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
144 145
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
146

147 148 149
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
150 151 152 153 154 155 156
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
157 158
}

L
Liu Jicong 已提交
159 160 161 162 163
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

L
Liu Jicong 已提交
164 165 166 167
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
  //
  return queue->qItem;
}
L
Liu Jicong 已提交
168 169 170 171

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
172
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
173
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
174
  } else {
175
    queue->qItem = NULL;
L
Liu Jicong 已提交
176 177 178 179 180
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
181
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
182 183 184
  }
}

L
Liu Jicong 已提交
185
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
186

L
Liu Jicong 已提交
187
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
188

L
Liu Jicong 已提交
189 190
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
191
typedef struct {
L
Liu Jicong 已提交
192
  char* qmsg;
L
Liu Jicong 已提交
193
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
194
  void* executor;
L
Liu Jicong 已提交
195 196 197
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
198
  int32_t taskId;
L
Liu Jicong 已提交
199 200 201 202 203
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
204
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
205
  int32_t   waitingRspCnt;
L
Liu Jicong 已提交
206
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
207 208
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
209
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
210

L
Liu Jicong 已提交
211
typedef struct {
L
Liu Jicong 已提交
212
  int64_t         stbUid;
L
Liu Jicong 已提交
213
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
214
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
215
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
216 217
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
218
  STSchema* pTSchema;
L
Liu Jicong 已提交
219 220
} STaskSinkTb;

L
Liu Jicong 已提交
221
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
222

L
Liu Jicong 已提交
223
typedef struct {
L
Liu Jicong 已提交
224 225
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
226
  void*     vnode;
L
Liu Jicong 已提交
227
  FSmaSink* smaSink;
L
Liu Jicong 已提交
228 229 230 231 232 233
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

L
Liu Jicong 已提交
234 235 236 237
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
L
Liu Jicong 已提交
238
  SEpSet  epSet;
L
Liu Jicong 已提交
239 240
} SStreamChildEpInfo;

L
Liu Jicong 已提交
241
typedef struct {
L
Liu Jicong 已提交
242 243
  int32_t srcNodeId;
  int32_t srcChildId;
L
Liu Jicong 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
  int64_t stateSaveVer;
  int64_t stateProcessedVer;
} SStreamCheckpointInfo;

typedef struct {
  int64_t streamId;
  int64_t checkTs;
  int32_t checkpointId;  // incremental
  int32_t taskId;
  SArray* checkpointVer;  // SArray<SStreamCheckpointInfo>
} SStreamMultiVgCheckpointInfo;

typedef struct {
  int32_t taskId;
  int32_t checkpointId;  // incremental
} SStreamCheckpointKey;

typedef struct {
  int32_t taskId;
  SArray* checkpointVer;
} SStreamRecoveringState;

266 267 268 269 270 271 272 273
// incremental state storage
typedef struct {
  SStreamTask* pOwner;
  TDB*         db;
  TTB*         pStateDb;
  TXN          txn;
} SStreamState;

274
typedef struct SStreamTask {
L
Liu Jicong 已提交
275 276
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
277
  int32_t totalLevel;
278 279
  int8_t  taskLevel;
  int8_t  outputType;
L
Liu Jicong 已提交
280 281
  int16_t dispatchMsgType;

282
  int8_t taskStatus;
L
Liu Jicong 已提交
283
  int8_t schedStatus;
L
Liu Jicong 已提交
284

L
Liu Jicong 已提交
285
  // node info
L
Liu Jicong 已提交
286
  int32_t selfChildId;
L
Liu Jicong 已提交
287 288 289
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
290 291 292
  int64_t recoverSnapVer;
  int64_t startVer;

L
Liu Jicong 已提交
293 294
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>
L
Liu Jicong 已提交
295 296
  int32_t nextCheckId;
  SArray* checkpointInfo;  // SArray<SStreamCheckpointInfo>
L
Liu Jicong 已提交
297

L
Liu Jicong 已提交
298 299 300
  // exec
  STaskExec exec;

301
  // output
L
Liu Jicong 已提交
302 303 304
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
305 306 307
    STaskSinkTb            tbSink;
    STaskSinkSma           smaSink;
    STaskSinkFetch         fetchSink;
L
Liu Jicong 已提交
308 309
  };

L
Liu Jicong 已提交
310 311
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
312 313 314

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
315

316 317 318 319 320 321 322
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

  // msg handle
  SMsgCb* pMsgCb;
323 324 325 326

  // state backend
  SStreamState* pState;

327
} SStreamTask;
L
Liu Jicong 已提交
328

L
Liu Jicong 已提交
329 330 331
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
332
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
333 334
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
335 336
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
337 338 339 340
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
L
Liu Jicong 已提交
341 342
      qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
343 344 345
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
L
Liu Jicong 已提交
346
    qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
L
Liu Jicong 已提交
347
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
L
Liu Jicong 已提交
348
    // qStreamInput(pTask->exec.executor, pSubmitClone);
5
54liuyao 已提交
349
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
350
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
351
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
352 353
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
354
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
355
  } else if (pItem->type == STREAM_INPUT__GET_RES) {
356
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
357
    // qStreamInput(pTask->exec.executor, pItem);
358 359
  }

L
Liu Jicong 已提交
360
  if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
361
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
362 363
  }

364
#if 0
L
Liu Jicong 已提交
365 366
  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
367
#endif
L
Liu Jicong 已提交
368 369 370 371 372 373 374 375
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
376
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
377
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
378
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
379
    taosFreeQitem(pBlock);
380
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
381
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
382
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
383
    taosFreeQitem(pBlock);
384 385 386
  } else {
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
387 388
  return 0;
}
L
Liu Jicong 已提交
389

L
Liu Jicong 已提交
390 391 392 393 394 395
typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
396 397 398
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
399 400 401
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
402
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
403 404 405
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
406 407 408 409 410 411 412 413
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
414 415
typedef struct {
  int64_t            streamId;
L
Liu Jicong 已提交
416
  int64_t            reqId;
L
Liu Jicong 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

L
Liu Jicong 已提交
432
#if 0
L
Liu Jicong 已提交
433 434 435
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
436 437
  int32_t upstreamTaskId;
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
438 439 440 441
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
L
Liu Jicong 已提交
442 443
  int32_t rspTaskId;
  int32_t reqTaskId;
L
Liu Jicong 已提交
444 445 446
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

L
Liu Jicong 已提交
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq);
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq);

int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp);
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp);

typedef struct {
  int64_t streamId;
  int32_t taskId;
} SMStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
} SMStreamTaskRecoverRsp;

int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq);
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq);

int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);

L
Liu Jicong 已提交
469 470 471 472
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
#endif

L
Liu Jicong 已提交
473 474
typedef struct {
  int64_t streamId;
L
Liu Jicong 已提交
475 476 477
  int32_t downstreamTaskId;
  int32_t taskId;
} SStreamRecoverDownstreamReq;
L
Liu Jicong 已提交
478 479

typedef struct {
L
Liu Jicong 已提交
480 481 482 483 484 485 486
  int64_t streamId;
  int32_t downstreamTaskId;
  int32_t taskId;
  SArray* checkpointVer;  // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;

int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
L
Liu Jicong 已提交
487 488 489 490 491 492 493 494 495 496 497 498
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);

int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t waitingRspCnt;
  int32_t totReq;
  SArray* info;  // SArray<SArray<SStreamCheckpointInfo>*>
} SStreamRecoverStatus;
L
Liu Jicong 已提交
499

500
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
501
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
L
Liu Jicong 已提交
502
void    tFreeStreamDispatchReq(SStreamDispatchReq* pReq);
503

504
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
505

L
Liu Jicong 已提交
506
int32_t streamProcessRunReq(SStreamTask* pTask);
L
Liu Jicong 已提交
507
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
L
Liu Jicong 已提交
508
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
L
Liu Jicong 已提交
509

L
Liu Jicong 已提交
510 511 512
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
513 514 515
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);

L
Liu Jicong 已提交
516 517 518 519 520 521
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);

typedef struct SStreamMeta {
  char*        path;
  TDB*         db;
  TTB*         pTaskDb;
522
  TTB*         pCheckpointDb;
L
Liu Jicong 已提交
523
  SHashObj*    pTasks;
L
Liu Jicong 已提交
524
  SHashObj*    pRecoverStatus;
L
Liu Jicong 已提交
525 526 527 528
  void*        ahandle;
  TXN          txn;
  FTaskExpand* expandFunc;
} SStreamMeta;
L
Liu Jicong 已提交
529

L
Liu Jicong 已提交
530
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
L
Liu Jicong 已提交
531 532
void         streamMetaClose(SStreamMeta* streamMeta);

L
Liu Jicong 已提交
533
int32_t      streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
534
int32_t      streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
535 536
int32_t      streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
L
Liu Jicong 已提交
537 538 539 540

int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
L
Liu Jicong 已提交
541
int32_t streamLoadTasks(SStreamMeta* pMeta);
L
Liu Jicong 已提交
542

543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
void          streamStateClose(SStreamState* pState);
int32_t       streamStateBegin(SStreamState* pState);
int32_t       streamStateCommit(SStreamState* pState);
int32_t       streamStateAbort(SStreamState* pState);

typedef struct {
  TBC* pCur;
} SStreamStateCur;

#if 1
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen);

SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen);
void             streamStateFreeCur(SStreamStateCur* pCur);

int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen);

int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);

int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);

#endif

L
Liu Jicong 已提交
573 574 575 576
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
577
#endif /* ifndef _STREAM_H_ */