tstream.h 13.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
L
Liu Jicong 已提交
17
#include "os.h"
L
Liu Jicong 已提交
18
#include "query.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tdbInt.h"
L
Liu Jicong 已提交
21 22
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
23
#include "tqueue.h"
L
Liu Jicong 已提交
24 25 26 27 28 29
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
30 31
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
32

L
Liu Jicong 已提交
33 34
typedef struct SStreamTask SStreamTask;

35 36 37 38 39
enum {
  STREAM_STATUS__NORMAL = 0,
  STREAM_STATUS__RECOVER,
};

L
Liu Jicong 已提交
40
enum {
L
Liu Jicong 已提交
41 42
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
L
Liu Jicong 已提交
43 44 45 46
  TASK_STATUS__FAIL,
  TASK_STATUS__STOP,
  TASK_STATUS__PREPARE_RECOVER,
  TASK_STATUS__RECOVERING,
L
Liu Jicong 已提交
47 48 49
};

enum {
L
Liu Jicong 已提交
50 51 52 53
  TASK_SCHED_STATUS__INACTIVE = 1,
  TASK_SCHED_STATUS__WAITING,
  TASK_SCHED_STATUS__ACTIVE,
  TASK_SCHED_STATUS__FAILED,
L
Liu Jicong 已提交
54 55 56 57 58 59 60
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
61
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
62 63 64 65 66 67
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
68 69
};

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
enum {
  TASK_TRIGGER_STATUS__INACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

enum {
  TASK_LEVEL__SOURCE = 1,
  TASK_LEVEL__AGG,
  TASK_LEVEL__SINK,
};

enum {
  TASK_OUTPUT__FIXED_DISPATCH = 1,
  TASK_OUTPUT__SHUFFLE_DISPATCH,
  TASK_OUTPUT__TABLE,
  TASK_OUTPUT__SMA,
  TASK_OUTPUT__FETCH,
};

L
Liu Jicong 已提交
89 90 91 92 93 94
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
95 96 97 98
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
99
typedef struct {
L
Liu Jicong 已提交
100 101
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
102 103 104 105
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

106 107 108 109 110 111 112
typedef struct {
  int8_t  type;
  int64_t ver;
  SArray* dataRefs;  // SArray<int32_t*>
  SArray* reqs;      // SArray<SSubmitReq*>
} SStreamMergedSubmit;

L
Liu Jicong 已提交
113 114 115
typedef struct {
  int8_t type;

L
Liu Jicong 已提交
116
  int32_t srcVgId;
L
Liu Jicong 已提交
117
  int32_t childId;
L
Liu Jicong 已提交
118
  int64_t sourceVer;
L
Liu Jicong 已提交
119
  int64_t reqId;
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

128 129 130 131 132
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
133 134 135 136
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
137 138
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
139

140 141 142
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
143 144 145 146 147 148 149
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
150 151
}

L
Liu Jicong 已提交
152 153 154 155 156 157 158 159 160 161
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
162
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
163
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
164
  } else {
165
    queue->qItem = NULL;
L
Liu Jicong 已提交
166 167 168 169 170
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
171
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
172 173 174
  }
}

L
Liu Jicong 已提交
175
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
178

L
Liu Jicong 已提交
179 180
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
181
typedef struct {
L
Liu Jicong 已提交
182
  char* qmsg;
L
Liu Jicong 已提交
183
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
184
  void* executor;
L
Liu Jicong 已提交
185 186 187
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
188
  int32_t taskId;
L
Liu Jicong 已提交
189 190 191 192 193
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
194
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
195
  int32_t   waitingRspCnt;
L
Liu Jicong 已提交
196
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
197 198
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
199
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
typedef struct {
L
Liu Jicong 已提交
202
  int64_t         stbUid;
L
Liu Jicong 已提交
203
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
204
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
205
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
206 207
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
208
  STSchema* pTSchema;
L
Liu Jicong 已提交
209 210 211
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
212
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
213

L
Liu Jicong 已提交
214
typedef struct {
L
Liu Jicong 已提交
215 216
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
217
  void*     vnode;
L
Liu Jicong 已提交
218
  FSmaSink* smaSink;
L
Liu Jicong 已提交
219 220 221 222 223 224
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

L
Liu Jicong 已提交
225 226 227 228
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
L
Liu Jicong 已提交
229 230 231
  // int64_t checkpointVer;
  // int64_t processedVer;
  SEpSet epSet;
L
Liu Jicong 已提交
232 233
} SStreamChildEpInfo;

L
Liu Jicong 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int64_t stateSaveVer;
  int64_t stateProcessedVer;
} SStreamCheckpointInfo;

typedef struct {
  int64_t streamId;
  int64_t checkTs;
  int32_t checkpointId;  // incremental
  int32_t taskId;
  SArray* checkpointVer;  // SArray<SStreamCheckpointInfo>
} SStreamMultiVgCheckpointInfo;

typedef struct {
  int32_t taskId;
  int32_t checkpointId;  // incremental
} SStreamCheckpointKey;

typedef struct {
  int32_t taskId;
  SArray* checkpointVer;
} SStreamRecoveringState;

259
typedef struct SStreamTask {
L
Liu Jicong 已提交
260 261
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
262
  int32_t totalLevel;
263 264
  int8_t  taskLevel;
  int8_t  outputType;
L
Liu Jicong 已提交
265 266
  int16_t dispatchMsgType;

267
  int8_t taskStatus;
L
Liu Jicong 已提交
268
  int8_t schedStatus;
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270
  // node info
L
Liu Jicong 已提交
271
  int32_t selfChildId;
L
Liu Jicong 已提交
272 273 274
  int32_t nodeId;
  SEpSet  epSet;

275 276
  // used for task source and sink,
  // while task agg should have processedVer for each child
L
Liu Jicong 已提交
277 278 279 280 281
  int64_t recoverSnapVer;
  int64_t startVer;
  int64_t checkpointVer;
  int64_t processedVer;

L
Liu Jicong 已提交
282 283
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>
L
Liu Jicong 已提交
284 285
  int32_t nextCheckId;
  SArray* checkpointInfo;  // SArray<SStreamCheckpointInfo>
L
Liu Jicong 已提交
286

L
Liu Jicong 已提交
287 288 289
  // exec
  STaskExec exec;

290
  // output
L
Liu Jicong 已提交
291 292 293
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
294 295 296
    STaskSinkTb            tbSink;
    STaskSinkSma           smaSink;
    STaskSinkFetch         fetchSink;
L
Liu Jicong 已提交
297 298
  };

L
Liu Jicong 已提交
299 300
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
301 302 303

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
304

305 306 307 308 309 310 311
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

  // msg handle
  SMsgCb* pMsgCb;
312
} SStreamTask;
L
Liu Jicong 已提交
313

L
Liu Jicong 已提交
314 315 316
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
317
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
318 319
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
320 321
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
322 323 324 325
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
L
Liu Jicong 已提交
326 327
      qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
328 329 330
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
L
Liu Jicong 已提交
331
    qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
L
Liu Jicong 已提交
332
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
L
Liu Jicong 已提交
333
    // qStreamInput(pTask->exec.executor, pSubmitClone);
5
54liuyao 已提交
334
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
335
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
336
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
337 338
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
339
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
340
  } else if (pItem->type == STREAM_INPUT__GET_RES) {
341
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
342
    // qStreamInput(pTask->exec.executor, pItem);
343 344
  }

L
Liu Jicong 已提交
345
  if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
346
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
347 348
  }

349
#if 0
L
Liu Jicong 已提交
350 351
  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
352
#endif
L
Liu Jicong 已提交
353 354 355 356 357 358 359 360
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
361
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
362
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
363
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
364
    taosFreeQitem(pBlock);
365
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
366
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
367
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
368
    taosFreeQitem(pBlock);
369 370 371
  } else {
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
372 373
  return 0;
}
L
Liu Jicong 已提交
374 375 376 377 378

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
379 380 381 382 383 384 385 386 387 388 389
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
390 391 392
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
393 394 395
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
396
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
397 398 399
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
400 401 402
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
403 404 405 406 407 408 409 410
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
411 412
typedef struct {
  int64_t            streamId;
L
Liu Jicong 已提交
413
  int64_t            reqId;
L
Liu Jicong 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

L
Liu Jicong 已提交
429 430 431
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
432 433
  int32_t upstreamTaskId;
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
434 435 436 437
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
L
Liu Jicong 已提交
438 439
  int32_t rspTaskId;
  int32_t reqTaskId;
L
Liu Jicong 已提交
440 441 442
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

L
Liu Jicong 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq);
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq);

int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp);
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp);

typedef struct {
  int64_t streamId;
  int32_t taskId;
} SMStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
} SMStreamTaskRecoverRsp;

int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq);
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq);

int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);

typedef struct {
  int64_t streamId;
} SPStreamTaskRecoverReq;

typedef struct {
  int8_t reserved;
} SPStreamTaskRecoverRsp;

473
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
474
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
L
Liu Jicong 已提交
475
void    tFreeStreamDispatchReq(SStreamDispatchReq* pReq);
476

477
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
478

L
Liu Jicong 已提交
479
int32_t streamProcessRunReq(SStreamTask* pTask);
L
Liu Jicong 已提交
480
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
L
Liu Jicong 已提交
481 482
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
483
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
484

L
Liu Jicong 已提交
485 486 487
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
488 489 490
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);

L
Liu Jicong 已提交
491 492 493 494 495 496 497 498
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);

typedef struct SStreamMeta {
  char*        path;
  TDB*         db;
  TTB*         pTaskDb;
  TTB*         pStateDb;
  SHashObj*    pTasks;
L
Liu Jicong 已提交
499
  SHashObj*    pRecoveringState;
L
Liu Jicong 已提交
500 501 502 503
  void*        ahandle;
  TXN          txn;
  FTaskExpand* expandFunc;
} SStreamMeta;
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
L
Liu Jicong 已提交
506 507
void         streamMetaClose(SStreamMeta* streamMeta);

L
Liu Jicong 已提交
508 509 510 511
int32_t      streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t      streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t      streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
L
Liu Jicong 已提交
512 513 514 515 516

int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);

L
Liu Jicong 已提交
517 518 519 520
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
521
#endif /* ifndef _STREAM_H_ */