dataDispatcher.c 9.0 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "planner.h"
#include "tcompression.h"
#include "tglobal.h"
#include "tqueue.h"
X
Xiaoyu Wang 已提交
22
#include "executorimpl.h"
X
Xiaoyu Wang 已提交
23

24
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
X
Xiaoyu Wang 已提交
25

26 27 28
typedef struct SDataDispatchBuf {
  int32_t useSize;
  int32_t allocSize;
X
Xiaoyu Wang 已提交
29
  char* pData;
30 31 32 33 34 35 36 37
} SDataDispatchBuf;

typedef struct SDataCacheEntry {
  int32_t dataLen;
  int32_t numOfRows;
  int8_t  compressed;
  char    data[];
} SDataCacheEntry;
X
Xiaoyu Wang 已提交
38 39 40

typedef struct SDataDispatchHandle {
  SDataSinkHandle sink;
41
  SDataSinkManager* pManager;
X
Xiaoyu Wang 已提交
42 43
  SDataBlockSchema schema;
  STaosQueue* pDataBlocks;
44 45
  SDataDispatchBuf nextOutput;
  int32_t status;
X
Xiaoyu Wang 已提交
46 47
  bool queryEnd;
  int64_t useconds;
48
  pthread_mutex_t mutex;
X
Xiaoyu Wang 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
} SDataDispatchHandle;

static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
  if (tsCompressColData < 0 || 0 == pData->info.rows) {
    return false;
  }

  for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
    SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
    int32_t colSize = pColRes->info.bytes * pData->info.rows;
    if (NEEDTO_COMPRESS_QUERY(colSize)) {
      return true;
    }
  }

  return false;
}

67
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
X
Xiaoyu Wang 已提交
68 69 70 71 72
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(
      pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}

73
static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
X
Xiaoyu Wang 已提交
74 75 76 77 78 79
  int32_t *compSizes = (int32_t*)data;
  if (compressed) {
    data += pSchema->numOfCols * sizeof(int32_t);
  }

  for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
80
    SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
X
Xiaoyu Wang 已提交
81
    if (compressed) {
82
      compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
X
Xiaoyu Wang 已提交
83 84 85 86
      data += compSizes[col];
      *compLen += compSizes[col];
      compSizes[col] = htonl(compSizes[col]);
    } else {
87 88
      memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows);
      data += pColRes->info.bytes * pInput->pData->info.rows;
X
Xiaoyu Wang 已提交
89 90 91
    }
  }

92
  int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
X
Xiaoyu Wang 已提交
93 94 95
  *(int32_t*)data = htonl(numOfTables);
  data += sizeof(int32_t);

96
  STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
X
Xiaoyu Wang 已提交
97 98 99 100 101
  while (item) {
    STableIdInfo* pDst = (STableIdInfo*)data;
    pDst->uid = htobe64(item->uid);
    pDst->key = htobe64(item->key);
    data += sizeof(STableIdInfo);
102
    item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
X
Xiaoyu Wang 已提交
103 104 105
  }
}

106 107 108 109 110 111
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
  pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
  pEntry->numOfRows = pInput->pData->info.rows;
D
dapan1121 已提交
112
  pEntry->dataLen = 0;
X
Xiaoyu Wang 已提交
113

114 115
  pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
  copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
D
dapan1121 已提交
116 117 118 119
  if (0 == pEntry->compressed) {
    pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows;
  }
  pBuf->useSize += pEntry->dataLen;
X
Xiaoyu Wang 已提交
120 121 122
  // todo completed
}

123 124 125 126 127 128 129 130 131 132 133
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
  if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) {
    return false;
  }
  pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
  pBuf->pData = malloc(pBuf->allocSize);
  return NULL != pBuf->pData;
}

static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
  pthread_mutex_lock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
134 135 136
  int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
  int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
      (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
137 138 139 140 141 142 143 144 145 146 147 148
  pDispatcher->status = status;
  pthread_mutex_unlock(&pDispatcher->mutex);
  return status;
}

static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
  pthread_mutex_lock(&pDispatcher->mutex);
  int32_t status = pDispatcher->status;
  pthread_mutex_unlock(&pDispatcher->mutex);
  return status;
}

X
Xiaoyu Wang 已提交
149
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
X
Xiaoyu Wang 已提交
150
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
151 152 153 154 155 156
  SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
  if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  toDataCacheEntry(pDispatcher, pInput, pBuf);
  taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
X
Xiaoyu Wang 已提交
157
  *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
158
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
159 160
}

X
Xiaoyu Wang 已提交
161
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
162 163
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
  pthread_mutex_lock(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
164 165
  pDispatcher->queryEnd = true;
  pDispatcher->useconds = useconds;
166 167
  pthread_mutex_unlock(&pDispatcher->mutex);
}
X
Xiaoyu Wang 已提交
168

X
Xiaoyu Wang 已提交
169
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
170 171
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
X
Xiaoyu Wang 已提交
172 173 174
    *pQueryEnd = pDispatcher->queryEnd;
    *pLen = 0;
    return;
175 176 177 178 179
  }
  SDataDispatchBuf* pBuf = NULL;
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
  memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
  taosFreeQitem(pBuf);
X
Xiaoyu Wang 已提交
180
  *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
X
Xiaoyu Wang 已提交
181 182
}

X
Xiaoyu Wang 已提交
183
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
184
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
185 186 187 188 189 190
  if (NULL == pDispatcher->nextOutput.pData) {
    assert(pDispatcher->queryEnd);
    pOutput->useconds = pDispatcher->useconds;
    pOutput->precision = pDispatcher->schema.precision;
    return TSDB_CODE_SUCCESS;
  }
191 192 193 194 195
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
  memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
  pOutput->numOfRows = pEntry->numOfRows;
  pOutput->compressed = pEntry->compressed;
  tfree(pDispatcher->nextOutput.pData);  // todo persistent
X
Xiaoyu Wang 已提交
196 197 198 199 200 201 202
  pOutput->bufStatus = updateStatus(pDispatcher);
  pthread_mutex_lock(&pDispatcher->mutex);
  pOutput->queryEnd = pDispatcher->queryEnd;
  pOutput->needSchedule = false;
  pOutput->useconds = pDispatcher->useconds;
  pOutput->precision = pDispatcher->schema.precision;
  pthread_mutex_unlock(&pDispatcher->mutex);
203 204
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
205

206 207 208 209 210 211 212 213 214 215 216
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
  tfree(pDispatcher->nextOutput.pData);
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
    SDataDispatchBuf* pBuf = NULL;
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
    tfree(pBuf->pData);
    taosFreeQitem(pBuf);
  }
  taosCloseQueue(pDispatcher->pDataBlocks);
  pthread_mutex_destroy(&pDispatcher->mutex);
X
Xiaoyu Wang 已提交
217 218
}

219
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
X
Xiaoyu Wang 已提交
220 221 222
  SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
  if (NULL == dispatcher) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
223
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
224 225
  }
  dispatcher->sink.fPut = putDataBlock;
D
dapan1121 已提交
226
  dispatcher->sink.fEndPut = endPut;
227 228
  dispatcher->sink.fGetLen = getDataLength;
  dispatcher->sink.fGetData = getDataBlock;
X
Xiaoyu Wang 已提交
229
  dispatcher->sink.fDestroy = destroyDataSinker;
230 231
  dispatcher->pManager = pManager;
  dispatcher->schema = pDataSink->schema;
X
Xiaoyu Wang 已提交
232 233
  dispatcher->status = DS_BUF_EMPTY;
  dispatcher->queryEnd = false;
X
Xiaoyu Wang 已提交
234
  dispatcher->pDataBlocks = taosOpenQueue();
235
  pthread_mutex_init(&dispatcher->mutex, NULL);
X
Xiaoyu Wang 已提交
236 237
  if (NULL == dispatcher->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
238
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
239 240 241 242
  }
  *pHandle = dispatcher;
  return TSDB_CODE_SUCCESS;
}