tstream.h 9.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "os.h"
L
Liu Jicong 已提交
17 18 19
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
20
#include "tqueue.h"
L
Liu Jicong 已提交
21 22 23 24 25 26
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27 28
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
29

L
Liu Jicong 已提交
30 31
typedef struct SStreamTask SStreamTask;

L
Liu Jicong 已提交
32
enum {
L
Liu Jicong 已提交
33 34 35 36 37 38 39 40
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
};

enum {
  TASK_EXEC_STATUS__IDLE = 1,
  TASK_EXEC_STATUS__EXECUTING,
  TASK_EXEC_STATUS__CLOSING,
L
Liu Jicong 已提交
41 42 43 44 45 46
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
L
Liu Jicong 已提交
47
  TASK_INPUT_STATUS__PROCESSING,
L
Liu Jicong 已提交
48
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
49
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
50 51 52 53 54 55
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
56 57
};

L
Liu Jicong 已提交
58 59 60
enum {
  STREAM_INPUT__DATA_SUBMIT = 1,
  STREAM_INPUT__DATA_BLOCK,
61
  STREAM_INPUT__TRIGGER,
L
Liu Jicong 已提交
62
  STREAM_INPUT__CHECKPOINT,
L
Liu Jicong 已提交
63
  STREAM_INPUT__DROP,
L
Liu Jicong 已提交
64 65
};

L
Liu Jicong 已提交
66 67 68 69
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
70
typedef struct {
L
Liu Jicong 已提交
71 72
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

typedef struct {
  int8_t type;

  int32_t sourceVg;
  int64_t sourceVer;

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

90 91 92 93 94
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
95 96 97 98 99 100
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
101 102 103 104
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
105 106
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
107

108 109 110
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
111 112 113 114 115 116 117
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
118 119
}

L
Liu Jicong 已提交
120 121 122 123 124 125 126 127 128 129
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
130
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
131
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
132 133 134 135 136 137
  } else {
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
138
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
139 140 141
  }
}

L
Liu Jicong 已提交
142
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146 147
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
148
#if 0
L
Liu Jicong 已提交
149 150
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void*   streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
L
Liu Jicong 已提交
151 152
#endif

L
Liu Jicong 已提交
153
typedef struct {
L
Liu Jicong 已提交
154
  char* qmsg;
L
Liu Jicong 已提交
155
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
156
  void* executor;
L
Liu Jicong 已提交
157 158 159
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
160
  int32_t taskId;
L
Liu Jicong 已提交
161 162 163
} STaskDispatcherInplace;

typedef struct {
L
Liu Jicong 已提交
164
  int32_t taskId;
L
Liu Jicong 已提交
165 166 167 168 169
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
170 171 172
  // int8_t  hashMethod;
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
173 174
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
175
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
typedef struct {
L
Liu Jicong 已提交
178
  int64_t         stbUid;
L
Liu Jicong 已提交
179
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
180
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
181
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
182 183
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
184
  STSchema* pTSchema;
L
Liu Jicong 已提交
185 186 187
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
188
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190
typedef struct {
L
Liu Jicong 已提交
191 192
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
193
  void*     vnode;
L
Liu Jicong 已提交
194
  FSmaSink* smaSink;
L
Liu Jicong 已提交
195 196 197 198 199 200 201 202
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

enum {
  TASK_SOURCE__SCAN = 1,
L
Liu Jicong 已提交
203 204
  TASK_SOURCE__PIPE,
  TASK_SOURCE__MERGE,
L
Liu Jicong 已提交
205 206 207 208
};

enum {
  TASK_EXEC__NONE = 1,
L
Liu Jicong 已提交
209 210
  TASK_EXEC__PIPE,
  TASK_EXEC__MERGE,
L
Liu Jicong 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
};

enum {
  TASK_DISPATCH__NONE = 1,
  TASK_DISPATCH__INPLACE,
  TASK_DISPATCH__FIXED,
  TASK_DISPATCH__SHUFFLE,
};

enum {
  TASK_SINK__NONE = 1,
  TASK_SINK__TABLE,
  TASK_SINK__SMA,
  TASK_SINK__FETCH,
};

L
Liu Jicong 已提交
227 228 229 230 231
enum {
  TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
  TASK_INPUT_TYPE__DATA_BLOCK,
};

232 233 234 235 236
enum {
  TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

L
Liu Jicong 已提交
237
struct SStreamTask {
L
Liu Jicong 已提交
238 239
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
240
  int8_t  inputType;
L
Liu Jicong 已提交
241 242 243
  int8_t  taskStatus;

  int8_t execStatus;
L
Liu Jicong 已提交
244 245 246 247 248 249

  int8_t  execType;
  int8_t  sinkType;
  int8_t  dispatchType;
  int16_t dispatchMsgType;

L
Liu Jicong 已提交
250 251
  int8_t dataScan;

L
Liu Jicong 已提交
252 253
  // node info
  int32_t childId;
L
Liu Jicong 已提交
254 255 256
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
257 258 259
  // exec
  STaskExec exec;

L
Liu Jicong 已提交
260 261 262
  // TODO: merge sink and dispatch

  //  local sink
L
Liu Jicong 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275
  union {
    STaskSinkTb    tbSink;
    STaskSinkSma   smaSink;
    STaskSinkFetch fetchSink;
  };

  // dispatch
  union {
    STaskDispatcherInplace inplaceDispatcher;
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
  };

L
Liu Jicong 已提交
276 277
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
278 279 280

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
281

282 283 284 285 286
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

L
Liu Jicong 已提交
287
  // application storage
L
Liu Jicong 已提交
288
  // void* ahandle;
289 290 291

  // msg handle
  SMsgCb* pMsgCb;
L
Liu Jicong 已提交
292
};
L
Liu Jicong 已提交
293

L
Liu Jicong 已提交
294
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
295 296
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
297 298
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  while (1) {
    int8_t inputStatus =
        atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      break;
    }
    ASSERT(0);
  }

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
320 321 322 323 324 325 326
  } else if (pItem->type == STREAM_INPUT__TRIGGER) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  }

  if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0 &&
      pTask->triggerStatus == TASK_TRIGGER_STATUS__IN_ACTIVE) {
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
327 328 329 330 331 332 333 334 335 336 337 338
  }

  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
339 340 341
  if (pTask->sinkType == TASK_SINK__TABLE) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
342
    taosFreeQitem(pBlock);
343 344
  } else if (pTask->sinkType == TASK_SINK__SMA) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
L
Liu Jicong 已提交
345
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
346
    taosFreeQitem(pBlock);
347 348 349 350
  } else {
    ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
351 352
  return 0;
}
L
Liu Jicong 已提交
353 354 355 356 357

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
358 359 360 361 362 363 364 365 366 367 368
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
369 370 371 372 373
typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
L
Liu Jicong 已提交
374
  int32_t sourceChildId;
L
Liu Jicong 已提交
375
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
376 377 378
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
379 380 381
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

403 404
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);

L
Liu Jicong 已提交
405
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
406
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
407

L
Liu Jicong 已提交
408 409 410 411
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
412
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
413

L
Liu Jicong 已提交
414 415 416 417
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
418
#endif /* ifndef _STREAM_H_ */