tstream.h 10.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "os.h"
L
Liu Jicong 已提交
17 18 19
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
20
#include "tqueue.h"
L
Liu Jicong 已提交
21 22 23 24 25 26
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27 28
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
29

L
Liu Jicong 已提交
30 31
typedef struct SStreamTask SStreamTask;

L
Liu Jicong 已提交
32
enum {
L
Liu Jicong 已提交
33 34 35 36 37 38 39 40
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
};

enum {
  TASK_EXEC_STATUS__IDLE = 1,
  TASK_EXEC_STATUS__EXECUTING,
  TASK_EXEC_STATUS__CLOSING,
L
Liu Jicong 已提交
41 42 43 44 45 46
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
L
Liu Jicong 已提交
47
  TASK_INPUT_STATUS__PROCESSING,
L
Liu Jicong 已提交
48
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
49
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
50 51 52 53 54 55
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
56 57
};

L
Liu Jicong 已提交
58 59 60 61
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
62
typedef struct {
L
Liu Jicong 已提交
63 64
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
65 66 67 68 69 70 71
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

typedef struct {
  int8_t type;

L
Liu Jicong 已提交
72
  int32_t srcVgId;
L
Liu Jicong 已提交
73 74 75 76 77 78 79 80 81
  int64_t sourceVer;

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

82 83 84 85 86
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
87 88 89 90 91 92
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
93 94 95 96
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
97 98
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
99

100 101 102
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
103 104 105 106 107 108 109
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
110 111
}

L
Liu Jicong 已提交
112 113 114 115 116 117 118 119 120 121
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
122
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
123
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
124 125 126 127 128 129
  } else {
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
130
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
131 132 133
  }
}

L
Liu Jicong 已提交
134
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
135

L
Liu Jicong 已提交
136
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138 139
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
140
typedef struct {
L
Liu Jicong 已提交
141
  char* qmsg;
L
Liu Jicong 已提交
142
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
143
  void* executor;
L
Liu Jicong 已提交
144 145 146
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
147
  int32_t taskId;
L
Liu Jicong 已提交
148 149 150 151 152
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
153 154 155
  // int8_t  hashMethod;
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
156 157
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
158
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
159

L
Liu Jicong 已提交
160
typedef struct {
L
Liu Jicong 已提交
161
  int64_t         stbUid;
L
Liu Jicong 已提交
162
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
163
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
164
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
165 166
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
167
  STSchema* pTSchema;
L
Liu Jicong 已提交
168 169 170
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
171
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
172

L
Liu Jicong 已提交
173
typedef struct {
L
Liu Jicong 已提交
174 175
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
176
  void*     vnode;
L
Liu Jicong 已提交
177
  FSmaSink* smaSink;
L
Liu Jicong 已提交
178 179 180 181 182 183 184 185
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

enum {
  TASK_SOURCE__SCAN = 1,
L
Liu Jicong 已提交
186 187
  TASK_SOURCE__PIPE,
  TASK_SOURCE__MERGE,
L
Liu Jicong 已提交
188 189 190 191
};

enum {
  TASK_EXEC__NONE = 1,
L
Liu Jicong 已提交
192 193
  TASK_EXEC__PIPE,
  TASK_EXEC__MERGE,
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
};

enum {
  TASK_DISPATCH__NONE = 1,
  TASK_DISPATCH__FIXED,
  TASK_DISPATCH__SHUFFLE,
};

enum {
  TASK_SINK__NONE = 1,
  TASK_SINK__TABLE,
  TASK_SINK__SMA,
  TASK_SINK__FETCH,
};

L
Liu Jicong 已提交
209 210 211 212 213
enum {
  TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
  TASK_INPUT_TYPE__DATA_BLOCK,
};

214 215 216 217 218
enum {
  TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

L
Liu Jicong 已提交
219 220 221 222 223 224 225
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
  SEpSet  epSet;
} SStreamChildEpInfo;

L
Liu Jicong 已提交
226
struct SStreamTask {
L
Liu Jicong 已提交
227 228
  int64_t streamId;
  int32_t taskId;
229
  int8_t  isDataScan;
L
Liu Jicong 已提交
230 231 232 233 234
  int8_t  execType;
  int8_t  sinkType;
  int8_t  dispatchType;
  int16_t dispatchMsgType;

235 236
  int8_t taskStatus;
  int8_t execStatus;
L
Liu Jicong 已提交
237

L
Liu Jicong 已提交
238
  // node info
L
Liu Jicong 已提交
239
  int32_t selfChildId;
L
Liu Jicong 已提交
240 241 242
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
243 244 245
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>

L
Liu Jicong 已提交
246 247 248
  // exec
  STaskExec exec;

L
Liu Jicong 已提交
249
  // TODO: unify sink and dispatch
L
Liu Jicong 已提交
250 251

  //  local sink
L
Liu Jicong 已提交
252 253 254 255 256 257
  union {
    STaskSinkTb    tbSink;
    STaskSinkSma   smaSink;
    STaskSinkFetch fetchSink;
  };

L
Liu Jicong 已提交
258
  // remote dispatcher
L
Liu Jicong 已提交
259 260 261 262 263
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
  };

L
Liu Jicong 已提交
264 265
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
266 267 268

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
269

270 271 272 273 274
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

L
Liu Jicong 已提交
275
  // application storage
L
Liu Jicong 已提交
276
  // void* ahandle;
277 278 279

  // msg handle
  SMsgCb* pMsgCb;
L
Liu Jicong 已提交
280
};
L
Liu Jicong 已提交
281

L
Liu Jicong 已提交
282 283 284
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
285
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
286 287
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
288 289
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
  while (1) {
    int8_t inputStatus =
        atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      break;
    }
    ASSERT(0);
  }

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
5
54liuyao 已提交
307
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
308 309 310
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
311 312 313 314
  } else if (pItem->type == STREAM_INPUT__TRIGGER) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
  }

L
Liu Jicong 已提交
315 316
  if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
317 318 319 320 321 322 323 324 325 326 327 328
  }

  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
329 330 331
  if (pTask->sinkType == TASK_SINK__TABLE) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
L
Liu Jicong 已提交
332
    taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
333
    taosFreeQitem(pBlock);
334 335
  } else if (pTask->sinkType == TASK_SINK__SMA) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
L
Liu Jicong 已提交
336
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
L
Liu Jicong 已提交
337
    taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
338
    taosFreeQitem(pBlock);
339 340 341 342
  } else {
    ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
343 344
  return 0;
}
L
Liu Jicong 已提交
345 346 347 348 349

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
350 351 352 353 354 355 356 357 358 359 360
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
361 362 363
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
364 365 366
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
367
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
368 369 370
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
371 372 373
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
374 375 376 377 378 379 380 381
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
typedef struct {
  int64_t            streamId;
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

L
Liu Jicong 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411
typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

412
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
413
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
414

L
Liu Jicong 已提交
415
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
416
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
417

L
Liu Jicong 已提交
418 419 420 421
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
422
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
423

L
Liu Jicong 已提交
424 425 426
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
427 428 429 430
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
431
#endif /* ifndef _STREAM_H_ */