dataDispatcher.c 8.4 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
18
#include "executorimpl.h"
X
Xiaoyu Wang 已提交
19 20
#include "planner.h"
#include "tcompression.h"
L
Liu Jicong 已提交
21
#include "tdatablock.h"
X
Xiaoyu Wang 已提交
22 23 24
#include "tglobal.h"
#include "tqueue.h"

25 26 27
typedef struct SDataDispatchBuf {
  int32_t useSize;
  int32_t allocSize;
L
Liu Jicong 已提交
28
  char*   pData;
29 30 31 32 33
} SDataDispatchBuf;

typedef struct SDataCacheEntry {
  int32_t dataLen;
  int32_t numOfRows;
34
  int32_t numOfCols;
35 36 37
  int8_t  compressed;
  char    data[];
} SDataCacheEntry;
X
Xiaoyu Wang 已提交
38 39

typedef struct SDataDispatchHandle {
L
Liu Jicong 已提交
40 41
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
X
Xiaoyu Wang 已提交
42
  SDataBlockDescNode* pSchema;
L
Liu Jicong 已提交
43 44 45 46 47 48
  STaosQueue*         pDataBlocks;
  SDataDispatchBuf    nextOutput;
  int32_t             status;
  bool                queryEnd;
  uint64_t            useconds;
  TdThreadMutex       mutex;
X
Xiaoyu Wang 已提交
49 50
} SDataDispatchHandle;

L
Liu Jicong 已提交
51
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
X
Xiaoyu Wang 已提交
52 53 54 55
  if (tsCompressColData < 0 || 0 == pData->info.rows) {
    return false;
  }

X
Xiaoyu Wang 已提交
56
  for (int32_t col = 0; col < numOfCols; ++col) {
X
Xiaoyu Wang 已提交
57
    SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
L
Liu Jicong 已提交
58
    int32_t          colSize = pColRes->info.bytes * pData->info.rows;
X
Xiaoyu Wang 已提交
59 60 61 62 63 64 65 66
    if (NEEDTO_COMPRESS_QUERY(colSize)) {
      return true;
    }
  }

  return false;
}

H
Haojun Liao 已提交
67
// data format:
68 69 70 71
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// |                |  (4 bytes)   |(8 bytes) | sizeof(int32_t) * numOfCols          | actual size |           | actual size |           |
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
H
Haojun Liao 已提交
72 73
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
74
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
L
Liu Jicong 已提交
75 76
  int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);

77
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
L
Liu Jicong 已提交
78 79
  pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols);
  pEntry->numOfRows = pInput->pData->info.rows;
80
  pEntry->numOfCols = pInput->pData->info.numOfCols;
L
Liu Jicong 已提交
81
  pEntry->dataLen = 0;
X
Xiaoyu Wang 已提交
82

H
Haojun Liao 已提交
83
  pBuf->useSize = sizeof(SRetrieveTableRsp);
L
Liu Jicong 已提交
84
  blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
85

L
Liu Jicong 已提交
86
  pBuf->useSize += pEntry->dataLen;
X
Xiaoyu Wang 已提交
87 88
}

89
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
H
Haojun Liao 已提交
90
  uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
91
  if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
H
Haojun Liao 已提交
92
    qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
93
           taosQueueItemSize(pDispatcher->pDataBlocks));
94 95
    return false;
  }
H
Haojun Liao 已提交
96

L
Liu Jicong 已提交
97
  pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData);
H
Haojun Liao 已提交
98

wafwerar's avatar
wafwerar 已提交
99
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
H
Haojun Liao 已提交
100 101 102 103
  if (pBuf->pData == NULL) {
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
  }

104 105 106 107
  return NULL != pBuf->pData;
}

static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
wafwerar's avatar
wafwerar 已提交
108
  taosThreadMutexLock(&pDispatcher->mutex);
109
  int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
L
Liu Jicong 已提交
110 111 112
  int32_t status =
      (0 == blockNums ? DS_BUF_EMPTY
                      : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
113
  pDispatcher->status = status;
wafwerar's avatar
wafwerar 已提交
114
  taosThreadMutexUnlock(&pDispatcher->mutex);
115 116 117 118
  return status;
}

static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
wafwerar's avatar
wafwerar 已提交
119
  taosThreadMutexLock(&pDispatcher->mutex);
120
  int32_t status = pDispatcher->status;
wafwerar's avatar
wafwerar 已提交
121
  taosThreadMutexUnlock(&pDispatcher->mutex);
122 123 124
  return status;
}

X
Xiaoyu Wang 已提交
125
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
X
Xiaoyu Wang 已提交
126
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
127
  SDataDispatchBuf*    pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
128 129 130 131 132
  if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  toDataCacheEntry(pDispatcher, pInput, pBuf);
  taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
X
Xiaoyu Wang 已提交
133
  *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
134
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
135 136
}

D
dapan1121 已提交
137
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
138
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
wafwerar's avatar
wafwerar 已提交
139
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
140 141
  pDispatcher->queryEnd = true;
  pDispatcher->useconds = useconds;
wafwerar's avatar
wafwerar 已提交
142
  taosThreadMutexUnlock(&pDispatcher->mutex);
143
}
X
Xiaoyu Wang 已提交
144

X
Xiaoyu Wang 已提交
145
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
146 147
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
X
Xiaoyu Wang 已提交
148 149 150
    *pQueryEnd = pDispatcher->queryEnd;
    *pLen = 0;
    return;
151
  }
152

153 154 155 156
  SDataDispatchBuf* pBuf = NULL;
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
  memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
  taosFreeQitem(pBuf);
X
Xiaoyu Wang 已提交
157
  *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
L
Liu Jicong 已提交
158
  *pQueryEnd = pDispatcher->queryEnd;
X
Xiaoyu Wang 已提交
159 160
}

X
Xiaoyu Wang 已提交
161
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
162
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
163 164 165
  if (NULL == pDispatcher->nextOutput.pData) {
    assert(pDispatcher->queryEnd);
    pOutput->useconds = pDispatcher->useconds;
X
Xiaoyu Wang 已提交
166
    pOutput->precision = pDispatcher->pSchema->precision;
D
dapan1121 已提交
167 168
    pOutput->bufStatus = DS_BUF_EMPTY;
    pOutput->queryEnd = pDispatcher->queryEnd;
169 170
    return TSDB_CODE_SUCCESS;
  }
171 172 173
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
  memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
  pOutput->numOfRows = pEntry->numOfRows;
174
  pOutput->numOfCols = pEntry->numOfCols;
175
  pOutput->compressed = pEntry->compressed;
wafwerar's avatar
wafwerar 已提交
176
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
X
Xiaoyu Wang 已提交
177
  pOutput->bufStatus = updateStatus(pDispatcher);
wafwerar's avatar
wafwerar 已提交
178
  taosThreadMutexLock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
179 180
  pOutput->queryEnd = pDispatcher->queryEnd;
  pOutput->useconds = pDispatcher->useconds;
X
Xiaoyu Wang 已提交
181
  pOutput->precision = pDispatcher->pSchema->precision;
wafwerar's avatar
wafwerar 已提交
182
  taosThreadMutexUnlock(&pDispatcher->mutex);
183 184
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
185

186 187
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
wafwerar's avatar
wafwerar 已提交
188
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
189 190 191
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
    SDataDispatchBuf* pBuf = NULL;
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
wafwerar's avatar
wafwerar 已提交
192
    taosMemoryFreeClear(pBuf->pData);
193 194 195
    taosFreeQitem(pBuf);
  }
  taosCloseQueue(pDispatcher->pDataBlocks);
wafwerar's avatar
wafwerar 已提交
196
  taosThreadMutexDestroy(&pDispatcher->mutex);
197
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
198 199
}

X
Xiaoyu Wang 已提交
200
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
wafwerar's avatar
wafwerar 已提交
201
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
X
Xiaoyu Wang 已提交
202 203
  if (NULL == dispatcher) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
204
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
205 206
  }
  dispatcher->sink.fPut = putDataBlock;
D
dapan1121 已提交
207
  dispatcher->sink.fEndPut = endPut;
208 209
  dispatcher->sink.fGetLen = getDataLength;
  dispatcher->sink.fGetData = getDataBlock;
X
Xiaoyu Wang 已提交
210
  dispatcher->sink.fDestroy = destroyDataSinker;
211
  dispatcher->pManager = pManager;
X
Xiaoyu Wang 已提交
212
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
X
Xiaoyu Wang 已提交
213 214
  dispatcher->status = DS_BUF_EMPTY;
  dispatcher->queryEnd = false;
X
Xiaoyu Wang 已提交
215
  dispatcher->pDataBlocks = taosOpenQueue();
wafwerar's avatar
wafwerar 已提交
216
  taosThreadMutexInit(&dispatcher->mutex, NULL);
X
Xiaoyu Wang 已提交
217 218
  if (NULL == dispatcher->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
219
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
220 221 222 223
  }
  *pHandle = dispatcher;
  return TSDB_CODE_SUCCESS;
}