tstream.h 12.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
L
Liu Jicong 已提交
17
#include "os.h"
L
Liu Jicong 已提交
18
#include "query.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tdbInt.h"
L
Liu Jicong 已提交
21 22
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
23
#include "tqueue.h"
L
Liu Jicong 已提交
24 25 26 27 28 29
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
30 31
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
32

L
Liu Jicong 已提交
33 34
typedef struct SStreamTask SStreamTask;

35 36 37 38 39
enum {
  STREAM_STATUS__NORMAL = 0,
  STREAM_STATUS__RECOVER,
};

L
Liu Jicong 已提交
40
enum {
L
Liu Jicong 已提交
41 42
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
L
Liu Jicong 已提交
43 44 45 46
  TASK_STATUS__FAIL,
  TASK_STATUS__STOP,
  TASK_STATUS__PREPARE_RECOVER,
  TASK_STATUS__RECOVERING,
L
Liu Jicong 已提交
47 48 49
};

enum {
L
Liu Jicong 已提交
50 51 52 53
  TASK_SCHED_STATUS__INACTIVE = 1,
  TASK_SCHED_STATUS__WAITING,
  TASK_SCHED_STATUS__ACTIVE,
  TASK_SCHED_STATUS__FAILED,
L
Liu Jicong 已提交
54 55 56 57 58 59 60
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
61
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
62 63 64 65 66 67
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
68 69
};

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
enum {
  TASK_TRIGGER_STATUS__INACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

enum {
  TASK_LEVEL__SOURCE = 1,
  TASK_LEVEL__AGG,
  TASK_LEVEL__SINK,
};

enum {
  TASK_OUTPUT__FIXED_DISPATCH = 1,
  TASK_OUTPUT__SHUFFLE_DISPATCH,
  TASK_OUTPUT__TABLE,
  TASK_OUTPUT__SMA,
  TASK_OUTPUT__FETCH,
};

L
Liu Jicong 已提交
89 90 91 92 93 94
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
95 96 97 98
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
99
typedef struct {
L
Liu Jicong 已提交
100 101
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
102 103 104 105
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

106 107 108 109 110 111 112
typedef struct {
  int8_t  type;
  int64_t ver;
  SArray* dataRefs;  // SArray<int32_t*>
  SArray* reqs;      // SArray<SSubmitReq*>
} SStreamMergedSubmit;

L
Liu Jicong 已提交
113 114 115
typedef struct {
  int8_t type;

L
Liu Jicong 已提交
116
  int32_t srcVgId;
L
Liu Jicong 已提交
117
  int32_t childId;
L
Liu Jicong 已提交
118
  int64_t sourceVer;
L
Liu Jicong 已提交
119
  int64_t reqId;
L
Liu Jicong 已提交
120 121 122 123 124 125 126 127

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

128 129 130 131 132
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
133 134 135 136
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
137 138
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
139

140 141 142
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
143 144 145 146 147 148 149
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
150 151
}

L
Liu Jicong 已提交
152 153 154 155 156 157 158 159 160 161
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
162
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
163
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
164
  } else {
165
    queue->qItem = NULL;
L
Liu Jicong 已提交
166 167 168 169 170
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
171
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
172 173 174
  }
}

L
Liu Jicong 已提交
175
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
178

L
Liu Jicong 已提交
179 180
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
181
typedef struct {
L
Liu Jicong 已提交
182
  char* qmsg;
L
Liu Jicong 已提交
183
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
184
  void* executor;
L
Liu Jicong 已提交
185 186 187
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
188
  int32_t taskId;
L
Liu Jicong 已提交
189 190 191 192 193
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
194
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
195
  int32_t   waitingRspCnt;
L
Liu Jicong 已提交
196
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
197 198
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
199
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
typedef struct {
L
Liu Jicong 已提交
202
  int64_t         stbUid;
L
Liu Jicong 已提交
203
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
204
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
205
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
206 207
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
208
  STSchema* pTSchema;
L
Liu Jicong 已提交
209 210 211
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
212
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
213

L
Liu Jicong 已提交
214
typedef struct {
L
Liu Jicong 已提交
215 216
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
217
  void*     vnode;
L
Liu Jicong 已提交
218
  FSmaSink* smaSink;
L
Liu Jicong 已提交
219 220 221 222 223 224
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

L
Liu Jicong 已提交
225 226 227 228
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
L
Liu Jicong 已提交
229 230
  int64_t checkpointVer;
  int64_t processedVer;
L
Liu Jicong 已提交
231 232 233
  SEpSet  epSet;
} SStreamChildEpInfo;

234
typedef struct SStreamTask {
L
Liu Jicong 已提交
235 236
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
237
  int32_t totalLevel;
238 239
  int8_t  taskLevel;
  int8_t  outputType;
L
Liu Jicong 已提交
240 241
  int16_t dispatchMsgType;

242
  int8_t taskStatus;
L
Liu Jicong 已提交
243
  int8_t schedStatus;
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245
  // node info
L
Liu Jicong 已提交
246
  int32_t selfChildId;
L
Liu Jicong 已提交
247 248 249
  int32_t nodeId;
  SEpSet  epSet;

250 251
  // used for task source and sink,
  // while task agg should have processedVer for each child
L
Liu Jicong 已提交
252 253 254 255 256
  int64_t recoverSnapVer;
  int64_t startVer;
  int64_t checkpointVer;
  int64_t processedVer;

L
Liu Jicong 已提交
257 258 259
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>

L
Liu Jicong 已提交
260 261 262
  // exec
  STaskExec exec;

263
  // output
L
Liu Jicong 已提交
264 265 266
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
267 268 269
    STaskSinkTb            tbSink;
    STaskSinkSma           smaSink;
    STaskSinkFetch         fetchSink;
L
Liu Jicong 已提交
270 271
  };

L
Liu Jicong 已提交
272 273
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
274 275 276

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
277

278 279 280 281 282 283 284
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

  // msg handle
  SMsgCb* pMsgCb;
285
} SStreamTask;
L
Liu Jicong 已提交
286

L
Liu Jicong 已提交
287 288 289
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
290
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
291 292
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
293 294
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
295 296 297 298
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
L
Liu Jicong 已提交
299 300
      qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
301 302 303
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
L
Liu Jicong 已提交
304
    qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
L
Liu Jicong 已提交
305
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
L
Liu Jicong 已提交
306
    // qStreamInput(pTask->exec.executor, pSubmitClone);
5
54liuyao 已提交
307
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
308
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
309
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
310 311
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
312
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
313
  } else if (pItem->type == STREAM_INPUT__GET_RES) {
314
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
315
    // qStreamInput(pTask->exec.executor, pItem);
316 317
  }

L
Liu Jicong 已提交
318
  if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
319
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
320 321
  }

322
#if 0
L
Liu Jicong 已提交
323 324
  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
325
#endif
L
Liu Jicong 已提交
326 327 328 329 330 331 332 333
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
334
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
335
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
336
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
337
    taosFreeQitem(pBlock);
338
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
339
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
340
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
341
    taosFreeQitem(pBlock);
342 343 344
  } else {
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
345 346
  return 0;
}
L
Liu Jicong 已提交
347 348 349 350 351

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
352 353 354 355 356 357 358 359 360 361 362
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
363 364 365
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
366 367 368
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
369
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
370 371 372
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
373 374 375
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
376 377 378 379 380 381 382 383
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
384 385
typedef struct {
  int64_t            streamId;
L
Liu Jicong 已提交
386
  int64_t            reqId;
L
Liu Jicong 已提交
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

L
Liu Jicong 已提交
402 403 404
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
405 406
  int32_t upstreamTaskId;
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
407 408 409 410
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
L
Liu Jicong 已提交
411 412
  int32_t rspTaskId;
  int32_t reqTaskId;
L
Liu Jicong 已提交
413 414 415
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

L
Liu Jicong 已提交
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq);
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq);

int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp);
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp);

typedef struct {
  int64_t streamId;
  int32_t taskId;
} SMStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
} SMStreamTaskRecoverRsp;

int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq);
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq);

int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);

typedef struct {
  int64_t streamId;
} SPStreamTaskRecoverReq;

typedef struct {
  int8_t reserved;
} SPStreamTaskRecoverRsp;

446
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
447
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
448

449
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451
int32_t streamProcessRunReq(SStreamTask* pTask);
L
Liu Jicong 已提交
452
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
L
Liu Jicong 已提交
453 454
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
455
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
456

L
Liu Jicong 已提交
457 458 459
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
460 461 462
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);

L
Liu Jicong 已提交
463 464 465 466 467 468 469 470 471 472 473 474
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);

typedef struct SStreamMeta {
  char*        path;
  TDB*         db;
  TTB*         pTaskDb;
  TTB*         pStateDb;
  SHashObj*    pTasks;
  void*        ahandle;
  TXN          txn;
  FTaskExpand* expandFunc;
} SStreamMeta;
L
Liu Jicong 已提交
475

L
Liu Jicong 已提交
476
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
L
Liu Jicong 已提交
477 478
void         streamMetaClose(SStreamMeta* streamMeta);

L
Liu Jicong 已提交
479 480 481 482
int32_t      streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t      streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t      streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
L
Liu Jicong 已提交
483 484 485 486 487

int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);

L
Liu Jicong 已提交
488 489 490 491
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
492
#endif /* ifndef _STREAM_H_ */