dataDispatcher.c 9.9 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
18
#include "executorInt.h"
X
Xiaoyu Wang 已提交
19 20
#include "planner.h"
#include "tcompression.h"
L
Liu Jicong 已提交
21
#include "tdatablock.h"
X
Xiaoyu Wang 已提交
22 23 24
#include "tglobal.h"
#include "tqueue.h"

D
dapan1121 已提交
25 26
extern SDataSinkStat gDataSinkStat;

27 28 29
typedef struct SDataDispatchBuf {
  int32_t useSize;
  int32_t allocSize;
L
Liu Jicong 已提交
30
  char*   pData;
31 32 33 34 35
} SDataDispatchBuf;

typedef struct SDataCacheEntry {
  int32_t dataLen;
  int32_t numOfRows;
36
  int32_t numOfCols;
37 38 39
  int8_t  compressed;
  char    data[];
} SDataCacheEntry;
X
Xiaoyu Wang 已提交
40 41

typedef struct SDataDispatchHandle {
L
Liu Jicong 已提交
42 43
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
X
Xiaoyu Wang 已提交
44
  SDataBlockDescNode* pSchema;
L
Liu Jicong 已提交
45 46 47 48 49
  STaosQueue*         pDataBlocks;
  SDataDispatchBuf    nextOutput;
  int32_t             status;
  bool                queryEnd;
  uint64_t            useconds;
D
dapan1121 已提交
50
  uint64_t            cachedSize;
L
Liu Jicong 已提交
51
  TdThreadMutex       mutex;
X
Xiaoyu Wang 已提交
52 53
} SDataDispatchHandle;

54
// clang-format off
H
Haojun Liao 已提交
55
// data format:
H
Haojun Liao 已提交
56 57 58 59
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry |  version         | total length | numOfRows    |     group id     | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... |                |  (4 bytes)   |(8 bytes)
// |                |  sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols        | actual size |           |
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
H
Haojun Liao 已提交
60 61
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
62
// clang-format on
D
dapan1121 已提交
63
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
64
  int32_t numOfCols = 0;
65
  SNode*  pNode;
66 67 68 69 70 71
  FOREACH(pNode, pHandle->pSchema->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
      ++numOfCols;
    }
  }
72
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
H
Haojun Liao 已提交
73
  pEntry->compressed = 0;
L
Liu Jicong 已提交
74
  pEntry->numOfRows = pInput->pData->info.rows;
S
shenglian zhou 已提交
75
  pEntry->numOfCols = numOfCols;
L
Liu Jicong 已提交
76
  pEntry->dataLen = 0;
X
Xiaoyu Wang 已提交
77

D
dapan1121 已提交
78
  pBuf->useSize = sizeof(SDataCacheEntry);
H
Haojun Liao 已提交
79
  pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
X
Xiaoyu Wang 已提交
80 81
  //  ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
  //  ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
82

L
Liu Jicong 已提交
83
  pBuf->useSize += pEntry->dataLen;
84 85 86

  atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
  atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
X
Xiaoyu Wang 已提交
87 88
}

89
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
H
Hongze Cheng 已提交
90 91 92 93 94 95 96 97
  /*
    uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
    if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
      qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
             taosQueueItemSize(pDispatcher->pDataBlocks));
      return false;
    }
  */
H
Haojun Liao 已提交
98

D
dapan1121 已提交
99
  pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
H
Haojun Liao 已提交
100

wafwerar's avatar
wafwerar 已提交
101
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
H
Haojun Liao 已提交
102 103 104 105
  if (pBuf->pData == NULL) {
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
  }

106 107 108 109
  return NULL != pBuf->pData;
}

static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
wafwerar's avatar
wafwerar 已提交
110
  taosThreadMutexLock(&pDispatcher->mutex);
111
  int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
L
Liu Jicong 已提交
112 113 114
  int32_t status =
      (0 == blockNums ? DS_BUF_EMPTY
                      : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
115
  pDispatcher->status = status;
wafwerar's avatar
wafwerar 已提交
116
  taosThreadMutexUnlock(&pDispatcher->mutex);
117 118 119 120
  return status;
}

static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
wafwerar's avatar
wafwerar 已提交
121
  taosThreadMutexLock(&pDispatcher->mutex);
122
  int32_t status = pDispatcher->status;
wafwerar's avatar
wafwerar 已提交
123
  taosThreadMutexUnlock(&pDispatcher->mutex);
124 125 126
  return status;
}

X
Xiaoyu Wang 已提交
127
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
dengyihao's avatar
dengyihao 已提交
128
  int32_t              code = 0;
X
Xiaoyu Wang 已提交
129
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
S
Shengliang Guan 已提交
130
  SDataDispatchBuf*    pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0);
D
dapan1121 已提交
131
  if (NULL == pBuf) {
S
Shengliang Guan 已提交
132
    return TSDB_CODE_OUT_OF_MEMORY;
133
  }
D
dapan1121 已提交
134 135 136

  if (!allocBuf(pDispatcher, pInput, pBuf)) {
    taosFreeQitem(pBuf);
S
Shengliang Guan 已提交
137
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
138
  }
X
Xiaoyu Wang 已提交
139

140
  toDataCacheEntry(pDispatcher, pInput, pBuf);
dengyihao's avatar
dengyihao 已提交
141 142 143 144
  code = taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
  if (code != 0) {
    return code;
  }
145 146 147

  int32_t status = updateStatus(pDispatcher);
  *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY);
148
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
149 150
}

D
dapan1121 已提交
151
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
152
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
wafwerar's avatar
wafwerar 已提交
153
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
154 155
  pDispatcher->queryEnd = true;
  pDispatcher->useconds = useconds;
wafwerar's avatar
wafwerar 已提交
156
  taosThreadMutexUnlock(&pDispatcher->mutex);
157
}
X
Xiaoyu Wang 已提交
158

D
dapan1121 已提交
159
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
160 161
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
X
Xiaoyu Wang 已提交
162 163 164
    *pQueryEnd = pDispatcher->queryEnd;
    *pLen = 0;
    return;
165
  }
166

167 168
  SDataDispatchBuf* pBuf = NULL;
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
X
Xiaoyu Wang 已提交
169 170 171 172
  if (pBuf != NULL) {
    memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
    taosFreeQitem(pBuf);
  }
H
Haojun Liao 已提交
173 174 175 176

  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
  *pLen = pEntry->dataLen;

X
Xiaoyu Wang 已提交
177 178
  //  ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
  //  ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
H
Haojun Liao 已提交
179

L
Liu Jicong 已提交
180
  *pQueryEnd = pDispatcher->queryEnd;
H
Hongze Cheng 已提交
181 182
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
         ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
X
Xiaoyu Wang 已提交
183 184
}

X
Xiaoyu Wang 已提交
185
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
186
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
187
  if (NULL == pDispatcher->nextOutput.pData) {
H
Haojun Liao 已提交
188
    ASSERT(pDispatcher->queryEnd);
189
    pOutput->useconds = pDispatcher->useconds;
X
Xiaoyu Wang 已提交
190
    pOutput->precision = pDispatcher->pSchema->precision;
D
dapan1121 已提交
191 192
    pOutput->bufStatus = DS_BUF_EMPTY;
    pOutput->queryEnd = pDispatcher->queryEnd;
193 194
    return TSDB_CODE_SUCCESS;
  }
195 196 197
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
  memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
  pOutput->numOfRows = pEntry->numOfRows;
198
  pOutput->numOfCols = pEntry->numOfCols;
199
  pOutput->compressed = pEntry->compressed;
D
dapan1121 已提交
200

201 202
  atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
D
dapan1121 已提交
203

wafwerar's avatar
wafwerar 已提交
204
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
X
Xiaoyu Wang 已提交
205
  pOutput->bufStatus = updateStatus(pDispatcher);
wafwerar's avatar
wafwerar 已提交
206
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
207 208
  pOutput->queryEnd = pDispatcher->queryEnd;
  pOutput->useconds = pDispatcher->useconds;
X
Xiaoyu Wang 已提交
209
  pOutput->precision = pDispatcher->pSchema->precision;
wafwerar's avatar
wafwerar 已提交
210
  taosThreadMutexUnlock(&pDispatcher->mutex);
D
dapan1121 已提交
211

212 213
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
214

215 216
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
D
dapan1121 已提交
217
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
wafwerar's avatar
wafwerar 已提交
218
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
219 220 221
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
    SDataDispatchBuf* pBuf = NULL;
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
H
Haojun Liao 已提交
222 223 224 225
    if (pBuf != NULL) {
      taosMemoryFreeClear(pBuf->pData);
      taosFreeQitem(pBuf);
    }
226 227
  }
  taosCloseQueue(pDispatcher->pDataBlocks);
wafwerar's avatar
wafwerar 已提交
228
  taosThreadMutexDestroy(&pDispatcher->mutex);
D
dapan1121 已提交
229
  taosMemoryFree(pDispatcher->pManager);
230
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
231 232
}

D
dapan1121 已提交
233
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
234 235 236 237 238 239
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
240
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
wafwerar's avatar
wafwerar 已提交
241
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
X
Xiaoyu Wang 已提交
242
  if (NULL == dispatcher) {
S
Shengliang Guan 已提交
243
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
244
    goto _return;
X
Xiaoyu Wang 已提交
245 246
  }
  dispatcher->sink.fPut = putDataBlock;
D
dapan1121 已提交
247
  dispatcher->sink.fEndPut = endPut;
248 249
  dispatcher->sink.fGetLen = getDataLength;
  dispatcher->sink.fGetData = getDataBlock;
X
Xiaoyu Wang 已提交
250
  dispatcher->sink.fDestroy = destroyDataSinker;
D
dapan1121 已提交
251
  dispatcher->sink.fGetCacheSize = getCacheSize;
252
  dispatcher->pManager = pManager;
X
Xiaoyu Wang 已提交
253
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
X
Xiaoyu Wang 已提交
254 255
  dispatcher->status = DS_BUF_EMPTY;
  dispatcher->queryEnd = false;
X
Xiaoyu Wang 已提交
256
  dispatcher->pDataBlocks = taosOpenQueue();
wafwerar's avatar
wafwerar 已提交
257
  taosThreadMutexInit(&dispatcher->mutex, NULL);
X
Xiaoyu Wang 已提交
258
  if (NULL == dispatcher->pDataBlocks) {
X
Xiaoyu Wang 已提交
259
    taosMemoryFree(dispatcher);
S
Shengliang Guan 已提交
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
261
    goto _return;
X
Xiaoyu Wang 已提交
262 263 264
  }
  *pHandle = dispatcher;
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
265 266 267 268 269

_return:

  taosMemoryFree(pManager);
  return terrno;
X
Xiaoyu Wang 已提交
270
}