tstream.h 11.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "executor.h"
L
Liu Jicong 已提交
17
#include "os.h"
L
Liu Jicong 已提交
18
#include "query.h"
L
Liu Jicong 已提交
19 20 21
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
L
Liu Jicong 已提交
22
#include "tqueue.h"
L
Liu Jicong 已提交
23 24 25 26 27 28
#include "trpc.h"

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
29 30
#ifndef _STREAM_H_
#define _STREAM_H_
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33
typedef struct SStreamTask SStreamTask;

L
Liu Jicong 已提交
34
enum {
L
Liu Jicong 已提交
35 36 37 38 39 40 41 42
  TASK_STATUS__NORMAL = 0,
  TASK_STATUS__DROPPING,
};

enum {
  TASK_EXEC_STATUS__IDLE = 1,
  TASK_EXEC_STATUS__EXECUTING,
  TASK_EXEC_STATUS__CLOSING,
L
Liu Jicong 已提交
43 44 45 46 47 48
};

enum {
  TASK_INPUT_STATUS__NORMAL = 1,
  TASK_INPUT_STATUS__BLOCKED,
  TASK_INPUT_STATUS__RECOVER,
L
Liu Jicong 已提交
49
  TASK_INPUT_STATUS__PROCESSING,
L
Liu Jicong 已提交
50
  TASK_INPUT_STATUS__STOP,
L
Liu Jicong 已提交
51
  TASK_INPUT_STATUS__FAILED,
L
Liu Jicong 已提交
52 53 54 55 56 57
};

enum {
  TASK_OUTPUT_STATUS__NORMAL = 1,
  TASK_OUTPUT_STATUS__WAIT,
  TASK_OUTPUT_STATUS__BLOCKED,
L
Liu Jicong 已提交
58 59
};

L
Liu Jicong 已提交
60 61 62 63
typedef struct {
  int8_t type;
} SStreamQueueItem;

L
Liu Jicong 已提交
64
typedef struct {
L
Liu Jicong 已提交
65 66
  int8_t      type;
  int64_t     ver;
L
Liu Jicong 已提交
67 68 69 70 71 72 73
  int32_t*    dataRef;
  SSubmitReq* data;
} SStreamDataSubmit;

typedef struct {
  int8_t type;

L
Liu Jicong 已提交
74
  int32_t srcVgId;
L
Liu Jicong 已提交
75 76 77 78 79 80 81 82 83
  int64_t sourceVer;

  SArray* blocks;  // SArray<SSDataBlock*>
} SStreamDataBlock;

typedef struct {
  int8_t type;
} SStreamCheckpoint;

84 85 86 87 88
typedef struct {
  int8_t       type;
  SSDataBlock* pBlock;
} SStreamTrigger;

L
Liu Jicong 已提交
89 90 91 92 93 94
enum {
  STREAM_QUEUE__SUCESS = 1,
  STREAM_QUEUE__FAILED,
  STREAM_QUEUE__PROCESSING,
};

L
Liu Jicong 已提交
95 96 97 98
typedef struct {
  STaosQueue* queue;
  STaosQall*  qall;
  void*       qItem;
L
Liu Jicong 已提交
99 100
  int8_t      status;
} SStreamQueue;
L
Liu Jicong 已提交
101

102 103 104
int32_t streamInit();
void    streamCleanUp();

L
Liu Jicong 已提交
105 106 107 108 109 110 111
SStreamQueue* streamQueueOpen();
void          streamQueueClose(SStreamQueue* queue);

static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  queue->qItem = NULL;
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
L
Liu Jicong 已提交
112 113
}

L
Liu Jicong 已提交
114 115 116 117 118 119 120 121 122 123
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
  ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}

static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }

static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
  int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
  if (dequeueFlag == STREAM_QUEUE__FAILED) {
L
Liu Jicong 已提交
124
    ASSERT(queue->qItem != NULL);
L
Liu Jicong 已提交
125
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
126 127 128 129 130 131
  } else {
    taosGetQitem(queue->qall, &queue->qItem);
    if (queue->qItem == NULL) {
      taosReadAllQitems(queue->queue, queue->qall);
      taosGetQitem(queue->qall, &queue->qItem);
    }
L
Liu Jicong 已提交
132
    return streamQueueCurItem(queue);
L
Liu Jicong 已提交
133 134 135
  }
}

L
Liu Jicong 已提交
136
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq);
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
L
Liu Jicong 已提交
139

L
Liu Jicong 已提交
140 141
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);

L
Liu Jicong 已提交
142
typedef struct {
L
Liu Jicong 已提交
143
  char* qmsg;
L
Liu Jicong 已提交
144
  // followings are not applicable to encoder and decoder
L
Liu Jicong 已提交
145
  void* executor;
L
Liu Jicong 已提交
146 147 148
} STaskExec;

typedef struct {
L
Liu Jicong 已提交
149
  int32_t taskId;
L
Liu Jicong 已提交
150 151 152 153 154
  int32_t nodeId;
  SEpSet  epSet;
} STaskDispatcherFixedEp;

typedef struct {
L
Liu Jicong 已提交
155 156 157
  // int8_t  hashMethod;
  char      stbFullName[TSDB_TABLE_FNAME_LEN];
  SUseDbRsp dbInfo;
L
Liu Jicong 已提交
158 159
} STaskDispatcherShuffle;

L
Liu Jicong 已提交
160
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162
typedef struct {
L
Liu Jicong 已提交
163
  int64_t         stbUid;
L
Liu Jicong 已提交
164
  char            stbFullName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
165
  SSchemaWrapper* pSchemaWrapper;
L
Liu Jicong 已提交
166
  // not applicable to encoder and decoder
L
Liu Jicong 已提交
167 168
  void*     vnode;
  FTbSink*  tbSinkFunc;
L
Liu Jicong 已提交
169
  STSchema* pTSchema;
L
Liu Jicong 已提交
170 171 172
  SHashObj* pHash;  // groupId to tbuid
} STaskSinkTb;

L
Liu Jicong 已提交
173
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
174

L
Liu Jicong 已提交
175
typedef struct {
L
Liu Jicong 已提交
176 177
  int64_t smaId;
  // following are not applicable to encoder and decoder
L
Liu Jicong 已提交
178
  void*     vnode;
L
Liu Jicong 已提交
179
  FSmaSink* smaSink;
L
Liu Jicong 已提交
180 181 182 183 184 185 186 187
} STaskSinkSma;

typedef struct {
  int8_t reserved;
} STaskSinkFetch;

enum {
  TASK_SOURCE__SCAN = 1,
L
Liu Jicong 已提交
188 189
  TASK_SOURCE__PIPE,
  TASK_SOURCE__MERGE,
L
Liu Jicong 已提交
190 191 192 193
};

enum {
  TASK_EXEC__NONE = 1,
L
Liu Jicong 已提交
194 195
  TASK_EXEC__PIPE,
  TASK_EXEC__MERGE,
L
Liu Jicong 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
};

enum {
  TASK_DISPATCH__NONE = 1,
  TASK_DISPATCH__FIXED,
  TASK_DISPATCH__SHUFFLE,
};

enum {
  TASK_SINK__NONE = 1,
  TASK_SINK__TABLE,
  TASK_SINK__SMA,
  TASK_SINK__FETCH,
};

L
Liu Jicong 已提交
211 212 213 214 215
enum {
  TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
  TASK_INPUT_TYPE__DATA_BLOCK,
};

216 217 218 219 220
enum {
  TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
  TASK_TRIGGER_STATUS__ACTIVE,
};

L
Liu Jicong 已提交
221 222 223 224 225 226 227
typedef struct {
  int32_t nodeId;
  int32_t childId;
  int32_t taskId;
  SEpSet  epSet;
} SStreamChildEpInfo;

228
typedef struct SStreamTask {
L
Liu Jicong 已提交
229 230
  int64_t streamId;
  int32_t taskId;
231
  int8_t  isDataScan;
L
Liu Jicong 已提交
232 233 234 235 236
  int8_t  execType;
  int8_t  sinkType;
  int8_t  dispatchType;
  int16_t dispatchMsgType;

237 238
  int8_t taskStatus;
  int8_t execStatus;
L
Liu Jicong 已提交
239

L
Liu Jicong 已提交
240
  // node info
L
Liu Jicong 已提交
241
  int32_t selfChildId;
L
Liu Jicong 已提交
242 243 244
  int32_t nodeId;
  SEpSet  epSet;

L
Liu Jicong 已提交
245 246 247
  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>

L
Liu Jicong 已提交
248 249 250
  // exec
  STaskExec exec;

L
Liu Jicong 已提交
251
  // TODO: unify sink and dispatch
L
Liu Jicong 已提交
252 253

  //  local sink
L
Liu Jicong 已提交
254 255 256 257 258 259
  union {
    STaskSinkTb    tbSink;
    STaskSinkSma   smaSink;
    STaskSinkFetch fetchSink;
  };

L
Liu Jicong 已提交
260
  // remote dispatcher
L
Liu Jicong 已提交
261 262 263 264 265
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
  };

L
Liu Jicong 已提交
266 267
  int8_t inputStatus;
  int8_t outputStatus;
L
Liu Jicong 已提交
268 269 270

  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;
L
Liu Jicong 已提交
271

272 273 274 275 276
  // trigger
  int8_t  triggerStatus;
  int64_t triggerParam;
  void*   timer;

L
Liu Jicong 已提交
277
  // application storage
L
Liu Jicong 已提交
278
  // void* ahandle;
279 280 281

  // msg handle
  SMsgCb* pMsgCb;
282
} SStreamTask;
L
Liu Jicong 已提交
283

L
Liu Jicong 已提交
284 285 286
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);

L
Liu Jicong 已提交
287
SStreamTask* tNewSStreamTask(int64_t streamId);
H
Hongze Cheng 已提交
288 289
int32_t      tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t      tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
L
Liu Jicong 已提交
290 291
void         tFreeSStreamTask(SStreamTask* pTask);

L
Liu Jicong 已提交
292
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
293
#if 0
L
Liu Jicong 已提交
294 295 296 297 298 299 300 301
  while (1) {
    int8_t inputStatus =
        atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      break;
    }
    ASSERT(0);
  }
302
#endif
L
Liu Jicong 已提交
303 304 305 306 307 308 309

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
    if (pSubmitClone == NULL) {
      atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
      return -1;
    }
L
Liu Jicong 已提交
310
    qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
L
Liu Jicong 已提交
311
    taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
L
Liu Jicong 已提交
312
    // qStreamInput(pTask->exec.executor, pSubmitClone);
5
54liuyao 已提交
313
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
314
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
315
    // qStreamInput(pTask->exec.executor, pItem);
L
Liu Jicong 已提交
316 317
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
318
    // qStreamInput(pTask->exec.executor, pItem);
319 320
  } else if (pItem->type == STREAM_INPUT__TRIGGER) {
    taosWriteQitem(pTask->inputQueue->queue, pItem);
L
Liu Jicong 已提交
321
    // qStreamInput(pTask->exec.executor, pItem);
322 323
  }

L
Liu Jicong 已提交
324 325
  if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
    atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
L
Liu Jicong 已提交
326 327
  }

328
#if 0
L
Liu Jicong 已提交
329 330
  // TODO: back pressure
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
331
#endif
L
Liu Jicong 已提交
332 333 334 335 336 337 338 339
  return 0;
}

static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
  atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}

static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
340 341 342
  if (pTask->sinkType == TASK_SINK__TABLE) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
    pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
343
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
344
    taosFreeQitem(pBlock);
345 346
  } else if (pTask->sinkType == TASK_SINK__SMA) {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
L
Liu Jicong 已提交
347
    pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
348
    taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
349
    taosFreeQitem(pBlock);
350 351 352 353
  } else {
    ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
    taosWriteQitem(pTask->outputQueue->queue, pBlock);
  }
L
Liu Jicong 已提交
354 355
  return 0;
}
L
Liu Jicong 已提交
356 357 358 359 360

typedef struct {
  int32_t reserved;
} SStreamTaskDeployRsp;

L
Liu Jicong 已提交
361 362 363 364 365 366 367 368 369 370 371
typedef struct {
  // SMsgHead     head;
  SStreamTask* task;
} SStreamTaskDeployReq;

typedef struct {
  SMsgHead head;
  int64_t  streamId;
  int32_t  taskId;
} SStreamTaskRunReq;

L
Liu Jicong 已提交
372 373 374
typedef struct {
  int64_t streamId;
  int32_t taskId;
L
Liu Jicong 已提交
375 376 377
  int32_t dataSrcVgId;
  int32_t upstreamTaskId;
  int32_t upstreamChildId;
L
Liu Jicong 已提交
378
  int32_t upstreamNodeId;
L
Liu Jicong 已提交
379 380 381
#if 0
  int64_t sourceVer;
#endif
L
Liu Jicong 已提交
382 383 384
  int32_t blockNum;
  SArray* dataLen;  // SArray<int32_t>
  SArray* data;     // SArray<SRetrieveTableRsp*>
L
Liu Jicong 已提交
385 386 387 388 389 390 391 392
} SStreamDispatchReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamDispatchRsp;

L
Liu Jicong 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
typedef struct {
  int64_t            streamId;
  int32_t            srcTaskId;
  int32_t            srcNodeId;
  int32_t            dstTaskId;
  int32_t            dstNodeId;
  int32_t            retrieveLen;
  SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;

typedef struct {
  int64_t streamId;
  int32_t childId;
  int32_t rspFromTaskId;
  int32_t rspToTaskId;
} SStreamRetrieveRsp;

L
Liu Jicong 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422
typedef struct {
  int64_t streamId;
  int32_t taskId;
  int32_t sourceTaskId;
  int32_t sourceVg;
} SStreamTaskRecoverReq;

typedef struct {
  int64_t streamId;
  int32_t taskId;
  int8_t  inputStatus;
} SStreamTaskRecoverRsp;

423
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
L
Liu Jicong 已提交
424
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
425

L
Liu Jicong 已提交
426
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
427
int32_t streamSetupTrigger(SStreamTask* pTask);
L
Liu Jicong 已提交
428

L
Liu Jicong 已提交
429 430 431 432
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
433
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
L
Liu Jicong 已提交
434

L
Liu Jicong 已提交
435 436 437
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);

L
Liu Jicong 已提交
438 439 440 441
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
442
#endif /* ifndef _STREAM_H_ */