dataDispatcher.c 9.9 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
18
#include "executorimpl.h"
X
Xiaoyu Wang 已提交
19 20 21 22
#include "planner.h"
#include "tcompression.h"
#include "tglobal.h"
#include "tqueue.h"
23
#include "tdatablock.h"
X
Xiaoyu Wang 已提交
24

25 26 27
typedef struct SDataDispatchBuf {
  int32_t useSize;
  int32_t allocSize;
X
Xiaoyu Wang 已提交
28
  char* pData;
29 30 31 32 33 34 35 36
} SDataDispatchBuf;

typedef struct SDataCacheEntry {
  int32_t dataLen;
  int32_t numOfRows;
  int8_t  compressed;
  char    data[];
} SDataCacheEntry;
X
Xiaoyu Wang 已提交
37 38 39

typedef struct SDataDispatchHandle {
  SDataSinkHandle sink;
40
  SDataSinkManager* pManager;
X
Xiaoyu Wang 已提交
41
  SDataBlockDescNode* pSchema;
X
Xiaoyu Wang 已提交
42
  STaosQueue* pDataBlocks;
43 44
  SDataDispatchBuf nextOutput;
  int32_t status;
X
Xiaoyu Wang 已提交
45
  bool queryEnd;
D
dapan1121 已提交
46
  uint64_t useconds;
wafwerar's avatar
wafwerar 已提交
47
  TdThreadMutex mutex;
X
Xiaoyu Wang 已提交
48 49
} SDataDispatchHandle;

X
Xiaoyu Wang 已提交
50
static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSchema) {
X
Xiaoyu Wang 已提交
51 52 53 54
  if (tsCompressColData < 0 || 0 == pData->info.rows) {
    return false;
  }

X
Xiaoyu Wang 已提交
55 56
  int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
  for (int32_t col = 0; col < numOfCols; ++col) {
X
Xiaoyu Wang 已提交
57 58 59 60 61 62 63 64 65 66
    SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
    int32_t colSize = pColRes->info.bytes * pData->info.rows;
    if (NEEDTO_COMPRESS_QUERY(colSize)) {
      return true;
    }
  }

  return false;
}

67
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
68
  int32_t colSize = colDataGetLength(pColRes, numOfRows);
X
Xiaoyu Wang 已提交
69 70 71 72
  return (*(tDataTypes[pColRes->info.type].compFunc))(
      pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}

73
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) {
X
Xiaoyu Wang 已提交
74
  int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
75
  int32_t * colSizes = (int32_t*)data;
X
Xiaoyu Wang 已提交
76

77 78 79 80
  data += numOfCols * sizeof(int32_t);
  *dataLen = (numOfCols * sizeof(int32_t));

  int32_t numOfRows = pInput->pData->info.rows;
X
Xiaoyu Wang 已提交
81
  for (int32_t col = 0; col < numOfCols; ++col) {
82
    SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
83 84 85 86 87 88 89 90 91 92 93 94 95 96

    // copy the null bitmap
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
      size_t metaSize = numOfRows * sizeof(int32_t);
      memcpy(data, pColRes->varmeta.offset, metaSize);
      data += metaSize;
      (*dataLen) += metaSize;
    } else {
      int32_t len = BitmapLen(numOfRows);
      memcpy(data, pColRes->nullbitmap, len);
      data += len;
      (*dataLen) += len;
    }

X
Xiaoyu Wang 已提交
97
    if (compressed) {
98 99 100
      colSizes[col] = compressColData(pColRes, numOfRows, data, compressed);
      data += colSizes[col];
      (*dataLen) += colSizes[col];
X
Xiaoyu Wang 已提交
101
    } else {
102 103 104 105
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
      (*dataLen) += colSizes[col];
      memmove(data, pColRes->pData, colSizes[col]);
      data += colSizes[col];
X
Xiaoyu Wang 已提交
106
    }
107 108

    colSizes[col] = htonl(colSizes[col]);
X
Xiaoyu Wang 已提交
109 110 111
  }
}

H
Haojun Liao 已提交
112 113 114 115 116 117 118
// data format:
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// |                |    sizeof(int32_t) * numOfCols       | actual size |           | actual size |           |
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
119 120
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
X
Xiaoyu Wang 已提交
121
  pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
122 123
  pEntry->numOfRows  = pInput->pData->info.rows;
  pEntry->dataLen    = 0;
X
Xiaoyu Wang 已提交
124

H
Haojun Liao 已提交
125
  pBuf->useSize = sizeof(SRetrieveTableRsp);
X
Xiaoyu Wang 已提交
126
  copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
127 128 129

  pEntry->dataLen = pEntry->dataLen;
  pBuf->useSize  += pEntry->dataLen;
X
Xiaoyu Wang 已提交
130 131
}

132
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
H
Haojun Liao 已提交
133 134 135 136
  uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
  if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) {
    qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
           taosQueueSize(pDispatcher->pDataBlocks));
137 138
    return false;
  }
H
Haojun Liao 已提交
139

H
Haojun Liao 已提交
140 141 142
  // NOTE: there are four bytes of an integer more than the required buffer space.
  // struct size + data payload + length for each column + bitmap length
  pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) +
H
Haojun Liao 已提交
143
      ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows);
H
Haojun Liao 已提交
144

wafwerar's avatar
wafwerar 已提交
145
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
H
Haojun Liao 已提交
146 147 148 149
  if (pBuf->pData == NULL) {
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
  }

150 151 152 153
  return NULL != pBuf->pData;
}

static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
wafwerar's avatar
wafwerar 已提交
154
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
155 156 157
  int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
  int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
      (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
158
  pDispatcher->status = status;
wafwerar's avatar
wafwerar 已提交
159
  taosThreadMutexUnlock(&pDispatcher->mutex);
160 161 162 163
  return status;
}

static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
wafwerar's avatar
wafwerar 已提交
164
  taosThreadMutexLock(&pDispatcher->mutex);
165
  int32_t status = pDispatcher->status;
wafwerar's avatar
wafwerar 已提交
166
  taosThreadMutexUnlock(&pDispatcher->mutex);
167 168 169
  return status;
}

X
Xiaoyu Wang 已提交
170
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
X
Xiaoyu Wang 已提交
171
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
172 173 174 175 176 177
  SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
  if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  toDataCacheEntry(pDispatcher, pInput, pBuf);
  taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
X
Xiaoyu Wang 已提交
178
  *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
179
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
180 181
}

D
dapan1121 已提交
182
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
183
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
wafwerar's avatar
wafwerar 已提交
184
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
185 186
  pDispatcher->queryEnd = true;
  pDispatcher->useconds = useconds;
wafwerar's avatar
wafwerar 已提交
187
  taosThreadMutexUnlock(&pDispatcher->mutex);
188
}
X
Xiaoyu Wang 已提交
189

X
Xiaoyu Wang 已提交
190
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
191 192
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
X
Xiaoyu Wang 已提交
193 194 195
    *pQueryEnd = pDispatcher->queryEnd;
    *pLen = 0;
    return;
196
  }
197

198 199 200 201
  SDataDispatchBuf* pBuf = NULL;
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
  memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
  taosFreeQitem(pBuf);
X
Xiaoyu Wang 已提交
202
  *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
H
Haojun Liao 已提交
203
  *pQueryEnd = pDispatcher->queryEnd;    
X
Xiaoyu Wang 已提交
204 205
}

X
Xiaoyu Wang 已提交
206
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
207
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
208 209 210
  if (NULL == pDispatcher->nextOutput.pData) {
    assert(pDispatcher->queryEnd);
    pOutput->useconds = pDispatcher->useconds;
X
Xiaoyu Wang 已提交
211
    pOutput->precision = pDispatcher->pSchema->precision;
212 213
    return TSDB_CODE_SUCCESS;
  }
214 215 216 217
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
  memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
  pOutput->numOfRows = pEntry->numOfRows;
  pOutput->compressed = pEntry->compressed;
wafwerar's avatar
wafwerar 已提交
218
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
X
Xiaoyu Wang 已提交
219
  pOutput->bufStatus = updateStatus(pDispatcher);
wafwerar's avatar
wafwerar 已提交
220
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
221 222
  pOutput->queryEnd = pDispatcher->queryEnd;
  pOutput->useconds = pDispatcher->useconds;
X
Xiaoyu Wang 已提交
223
  pOutput->precision = pDispatcher->pSchema->precision;
wafwerar's avatar
wafwerar 已提交
224
  taosThreadMutexUnlock(&pDispatcher->mutex);
225 226
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
227

228 229
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
wafwerar's avatar
wafwerar 已提交
230
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
231 232 233
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
    SDataDispatchBuf* pBuf = NULL;
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
wafwerar's avatar
wafwerar 已提交
234
    taosMemoryFreeClear(pBuf->pData);
235 236 237
    taosFreeQitem(pBuf);
  }
  taosCloseQueue(pDispatcher->pDataBlocks);
wafwerar's avatar
wafwerar 已提交
238
  taosThreadMutexDestroy(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
239 240
}

X
Xiaoyu Wang 已提交
241
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
wafwerar's avatar
wafwerar 已提交
242
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
X
Xiaoyu Wang 已提交
243 244
  if (NULL == dispatcher) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
245
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
246 247
  }
  dispatcher->sink.fPut = putDataBlock;
D
dapan1121 已提交
248
  dispatcher->sink.fEndPut = endPut;
249 250
  dispatcher->sink.fGetLen = getDataLength;
  dispatcher->sink.fGetData = getDataBlock;
X
Xiaoyu Wang 已提交
251
  dispatcher->sink.fDestroy = destroyDataSinker;
252
  dispatcher->pManager = pManager;
X
Xiaoyu Wang 已提交
253
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
X
Xiaoyu Wang 已提交
254 255
  dispatcher->status = DS_BUF_EMPTY;
  dispatcher->queryEnd = false;
X
Xiaoyu Wang 已提交
256
  dispatcher->pDataBlocks = taosOpenQueue();
wafwerar's avatar
wafwerar 已提交
257
  taosThreadMutexInit(&dispatcher->mutex, NULL);
X
Xiaoyu Wang 已提交
258 259
  if (NULL == dispatcher->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
260
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
261 262 263 264
  }
  *pHandle = dispatcher;
  return TSDB_CODE_SUCCESS;
}