tstream.h 10.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "os.h"
L
Liu Jicong 已提交
17 18 19
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
20
#include "tqueue.h"
L
Liu Jicong 已提交
21 22 23 24 25 26
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27 28
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
29

L
Liu Jicong 已提交
30 31
typedef struct SStreamTask SStreamTask;

L
Liu Jicong 已提交
32
enum {
L
Liu Jicong 已提交
33 34 35 36 37 38 39 40
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
};

enum {
  TASK_EXEC_STATUS__IDLE = 1,
  TASK_EXEC_STATUS__EXECUTING,
  TASK_EXEC_STATUS__CLOSING,
L
Liu Jicong 已提交
41 42 43 44 45 46
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
L
Liu Jicong 已提交
47
  TASK_INPUT_STATUS__PROCESSING,
L
Liu Jicong 已提交
48
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
49
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
50 51 52 53 54 55
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
56 57
};

L
Liu Jicong 已提交
58 59 60
enum {
  STREAM_INPUT__DATA_SUBMIT = 1,
  STREAM_INPUT__DATA_BLOCK,
61
  STREAM_INPUT__TRIGGER,
L
Liu Jicong 已提交
62
  STREAM_INPUT__CHECKPOINT,
L
Liu Jicong 已提交
63
  STREAM_INPUT__DROP,
L
Liu Jicong 已提交
64 65
};

L
Liu Jicong 已提交
66 67 68 69
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
70
typedef struct {
L
Liu Jicong 已提交
71 72
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
73 74 75 76 77 78 79
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

typedef struct {
  int8_t type;

L
Liu Jicong 已提交
80
  int32_t srcVgId;
L
Liu Jicong 已提交
81 82 83 84 85 86 87 88 89
  int64_t sourceVer;

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

90 91 92 93 94
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
95 96 97 98 99 100
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
101 102 103 104
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
105 106
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
107

108 109 110
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
111 112 113 114 115 116 117
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
118 119
}

L
Liu Jicong 已提交
120 121 122 123 124 125 126 127 128 129
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
130
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
131
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
132 133 134 135 136 137
  } else {
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
138
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
139 140 141
  }
}

L
Liu Jicong 已提交
142
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146 147
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
148
typedef struct {
L
Liu Jicong 已提交
149
  char* qmsg;
L
Liu Jicong 已提交
150
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
151
  void* executor;
L
Liu Jicong 已提交
152 153 154
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
155
  int32_t taskId;
L
Liu Jicong 已提交
156 157 158
} STaskDispatcherInplace;

typedef struct {
L
Liu Jicong 已提交
159
  int32_t taskId;
L
Liu Jicong 已提交
160 161 162 163 164
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
165 166 167
  // int8_t  hashMethod;
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
168 169
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
170
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
171

L
Liu Jicong 已提交
172
typedef struct {
L
Liu Jicong 已提交
173
  int64_t         stbUid;
L
Liu Jicong 已提交
174
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
175
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
176
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
177 178
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
179
  STSchema* pTSchema;
L
Liu Jicong 已提交
180 181 182
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
183
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
184

L
Liu Jicong 已提交
185
typedef struct {
L
Liu Jicong 已提交
186 187
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
188
  void*     vnode;
L
Liu Jicong 已提交
189
  FSmaSink* smaSink;
L
Liu Jicong 已提交
190 191 192 193 194 195 196 197
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

enum {
  TASK_SOURCE__SCAN = 1,
L
Liu Jicong 已提交
198 199
  TASK_SOURCE__PIPE,
  TASK_SOURCE__MERGE,
L
Liu Jicong 已提交
200 201 202 203
};

enum {
  TASK_EXEC__NONE = 1,
L
Liu Jicong 已提交
204 205
  TASK_EXEC__PIPE,
  TASK_EXEC__MERGE,
L
Liu Jicong 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
};

enum {
  TASK_DISPATCH__NONE = 1,
  TASK_DISPATCH__INPLACE,
  TASK_DISPATCH__FIXED,
  TASK_DISPATCH__SHUFFLE,
};

enum {
  TASK_SINK__NONE = 1,
  TASK_SINK__TABLE,
  TASK_SINK__SMA,
  TASK_SINK__FETCH,
};

L
Liu Jicong 已提交
222 223 224 225 226
enum {
  TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
  TASK_INPUT_TYPE__DATA_BLOCK,
};

227 228 229 230 231
enum {
  TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

L
Liu Jicong 已提交
232 233 234 235 236 237 238
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
  SEpSet  epSet;
} SStreamChildEpInfo;

L
Liu Jicong 已提交
239
struct SStreamTask {
L
Liu Jicong 已提交
240 241
  int64_t streamId;
  int32_t taskId;
242
  int8_t  isDataScan;
L
Liu Jicong 已提交
243 244 245 246 247
  int8_t  execType;
  int8_t  sinkType;
  int8_t  dispatchType;
  int16_t dispatchMsgType;

248 249
  int8_t taskStatus;
  int8_t execStatus;
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251
  // node info
L
Liu Jicong 已提交
252
  int32_t selfChildId;
L
Liu Jicong 已提交
253 254 255
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
256 257 258
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>

L
Liu Jicong 已提交
259 260 261
  // exec
  STaskExec exec;

L
Liu Jicong 已提交
262 263 264
  // TODO: merge sink and dispatch

  //  local sink
L
Liu Jicong 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277
  union {
    STaskSinkTb    tbSink;
    STaskSinkSma   smaSink;
    STaskSinkFetch fetchSink;
  };

  // dispatch
  union {
    STaskDispatcherInplace inplaceDispatcher;
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
  };

L
Liu Jicong 已提交
278 279
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
280 281 282

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
283

284 285 286 287 288
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

L
Liu Jicong 已提交
289
  // application storage
L
Liu Jicong 已提交
290
  // void* ahandle;
291 292 293

  // msg handle
  SMsgCb* pMsgCb;
L
Liu Jicong 已提交
294
};
L
Liu Jicong 已提交
295

L
Liu Jicong 已提交
296 297 298
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
299
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
300 301
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
302 303
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  while (1) {
    int8_t inputStatus =
        atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      break;
    }
    ASSERT(0);
  }

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
325 326 327 328 329 330 331
  } else if (pItem->type == STREAM_INPUT__TRIGGER) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  }

  if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0 &&
      pTask->triggerStatus == TASK_TRIGGER_STATUS__IN_ACTIVE) {
    atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
332 333 334 335 336 337 338 339 340 341 342 343
  }

  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
344 345 346
  if (pTask->sinkType == TASK_SINK__TABLE) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
347
    taosFreeQitem(pBlock);
348 349
  } else if (pTask->sinkType == TASK_SINK__SMA) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
L
Liu Jicong 已提交
350
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
351
    taosFreeQitem(pBlock);
352 353 354 355
  } else {
    ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
356 357
  return 0;
}
L
Liu Jicong 已提交
358 359 360 361 362

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
363 364 365 366 367 368 369 370 371 372 373
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
374 375 376
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
377 378 379
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
380
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
381 382 383
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
384 385 386
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
387 388 389 390 391 392 393 394
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
typedef struct {
  int64_t            streamId;
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

L
Liu Jicong 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424
typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

425
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
426
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
427

L
Liu Jicong 已提交
428
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
429
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
430

L
Liu Jicong 已提交
431 432 433 434
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
435
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
436

L
Liu Jicong 已提交
437 438 439
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
440 441 442 443
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
444
#endif /* ifndef _STREAM_H_ */