tstream.h 10.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "os.h"
L
Liu Jicong 已提交
17 18 19
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
20
#include "tqueue.h"
L
Liu Jicong 已提交
21 22 23 24 25 26
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27 28
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
29

L
Liu Jicong 已提交
30 31
typedef struct SStreamTask SStreamTask;

L
Liu Jicong 已提交
32
enum {
L
Liu Jicong 已提交
33 34 35 36 37 38 39 40 41
  TASK_STATUS__IDLE = 1,
  TASK_STATUS__EXECUTING,
  TASK_STATUS__CLOSING,
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
L
Liu Jicong 已提交
42
  TASK_INPUT_STATUS__PROCESSING,
L
Liu Jicong 已提交
43
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
44
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
45 46 47 48 49 50
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
51 52
};

L
Liu Jicong 已提交
53 54 55 56 57
enum {
  STREAM_CREATED_BY__USER = 1,
  STREAM_CREATED_BY__SMA,
};

L
Liu Jicong 已提交
58 59 60
enum {
  STREAM_INPUT__DATA_SUBMIT = 1,
  STREAM_INPUT__DATA_BLOCK,
61
  STREAM_INPUT__TRIGGER,
L
Liu Jicong 已提交
62 63 64
  STREAM_INPUT__CHECKPOINT,
};

L
Liu Jicong 已提交
65 66 67 68
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
69
typedef struct {
L
Liu Jicong 已提交
70 71
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

typedef struct {
  int8_t type;

  int32_t sourceVg;
  int64_t sourceVer;

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

89 90 91 92 93
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
94 95 96 97 98 99
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
100 101 102 103
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
104 105
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
106

107 108 109
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
110 111 112 113 114 115 116
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
117 118
}

L
Liu Jicong 已提交
119 120 121 122 123 124 125 126 127 128
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
129
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
130
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
131 132 133 134 135 136
  } else {
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
137
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
138 139 140
  }
}

L
Liu Jicong 已提交
141
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
142

L
Liu Jicong 已提交
143
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
144

L
Liu Jicong 已提交
145 146
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
147
#if 0
L
Liu Jicong 已提交
148 149
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void*   streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
L
Liu Jicong 已提交
150 151
#endif

L
Liu Jicong 已提交
152
typedef struct {
L
Liu Jicong 已提交
153
  char* qmsg;
L
Liu Jicong 已提交
154
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
155
  // void* inputHandle;
L
Liu Jicong 已提交
156
  void* executor;
L
Liu Jicong 已提交
157 158 159
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
160
  int32_t taskId;
L
Liu Jicong 已提交
161 162 163
} STaskDispatcherInplace;

typedef struct {
L
Liu Jicong 已提交
164
  int32_t taskId;
L
Liu Jicong 已提交
165 166 167 168 169
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
170 171 172
  // int8_t  hashMethod;
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
173 174
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
175
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
typedef struct {
L
Liu Jicong 已提交
178
  int64_t         stbUid;
L
Liu Jicong 已提交
179
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
180
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
181
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
182 183
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
184
  STSchema* pTSchema;
L
Liu Jicong 已提交
185 186 187
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
188
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190
typedef struct {
L
Liu Jicong 已提交
191 192
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
193
  void*     vnode;
L
Liu Jicong 已提交
194
  FSmaSink* smaSink;
L
Liu Jicong 已提交
195 196 197 198 199 200 201 202
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

enum {
  TASK_SOURCE__SCAN = 1,
L
Liu Jicong 已提交
203 204
  TASK_SOURCE__PIPE,
  TASK_SOURCE__MERGE,
L
Liu Jicong 已提交
205 206 207 208
};

enum {
  TASK_EXEC__NONE = 1,
L
Liu Jicong 已提交
209 210
  TASK_EXEC__PIPE,
  TASK_EXEC__MERGE,
L
Liu Jicong 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
};

enum {
  TASK_DISPATCH__NONE = 1,
  TASK_DISPATCH__INPLACE,
  TASK_DISPATCH__FIXED,
  TASK_DISPATCH__SHUFFLE,
};

enum {
  TASK_SINK__NONE = 1,
  TASK_SINK__TABLE,
  TASK_SINK__SMA,
  TASK_SINK__FETCH,
};

L
Liu Jicong 已提交
227 228 229 230 231
enum {
  TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
  TASK_INPUT_TYPE__DATA_BLOCK,
};

232 233 234 235 236
enum {
  TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

L
Liu Jicong 已提交
237
struct SStreamTask {
L
Liu Jicong 已提交
238 239
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
240
  int8_t  inputType;
L
Liu Jicong 已提交
241 242 243 244 245 246 247
  int8_t  status;

  int8_t  execType;
  int8_t  sinkType;
  int8_t  dispatchType;
  int16_t dispatchMsgType;

L
Liu Jicong 已提交
248 249
  int8_t dataScan;

L
Liu Jicong 已提交
250 251
  // node info
  int32_t childId;
L
Liu Jicong 已提交
252 253 254
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
255 256 257
  // exec
  STaskExec exec;

L
Liu Jicong 已提交
258 259 260
  // TODO: merge sink and dispatch

  //  local sink
L
Liu Jicong 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
  union {
    STaskSinkTb    tbSink;
    STaskSinkSma   smaSink;
    STaskSinkFetch fetchSink;
  };

  // dispatch
  union {
    STaskDispatcherInplace inplaceDispatcher;
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
  };

L
Liu Jicong 已提交
274 275
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
276 277 278

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
279

280 281 282 283 284
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

L
Liu Jicong 已提交
285
  // application storage
L
Liu Jicong 已提交
286
  // void* ahandle;
287 288 289

  // msg handle
  SMsgCb* pMsgCb;
L
Liu Jicong 已提交
290
};
L
Liu Jicong 已提交
291

L
Liu Jicong 已提交
292
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
293 294
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
295 296
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  while (1) {
    int8_t inputStatus =
        atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      break;
    }
    ASSERT(0);
  }

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
318 319 320 321 322 323 324
  } else if (pItem->type == STREAM_INPUT__TRIGGER) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  }

  if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0 &&
      pTask->triggerStatus == TASK_TRIGGER_STATUS__IN_ACTIVE) {
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
325 326 327 328 329 330 331 332 333 334 335 336
  }

  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
337 338 339
  if (pTask->sinkType == TASK_SINK__TABLE) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
340
    taosFreeQitem(pBlock);
341 342
  } else if (pTask->sinkType == TASK_SINK__SMA) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
L
Liu Jicong 已提交
343
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
344
    taosFreeQitem(pBlock);
345 346 347 348
  } else {
    ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
349 350
  return 0;
}
L
Liu Jicong 已提交
351 352 353 354 355

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
356 357 358 359 360 361 362 363 364 365 366
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
367 368 369 370 371
typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
L
Liu Jicong 已提交
372
  int32_t sourceChildId;
L
Liu Jicong 已提交
373
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
374 375 376
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
377 378 379
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

401 402
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);

403 404
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
405

L
Liu Jicong 已提交
406
int32_t streamTaskRun(SStreamTask* pTask);
L
Liu Jicong 已提交
407

L
Liu Jicong 已提交
408
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
L
Liu Jicong 已提交
409 410 411 412
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
413

L
Liu Jicong 已提交
414 415 416 417
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
418
#endif /* ifndef _STREAM_H_ */