/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "dataSinkInt.h" #include "dataSinkMgt.h" #include "planner.h" #include "tcompression.h" #include "tglobal.h" #include "tqueue.h" #define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) typedef struct SDataDispatchBuf { int32_t useSize; int32_t allocSize; char* pData; } SDataDispatchBuf; typedef struct SDataCacheEntry { int32_t dataLen; int32_t numOfRows; int8_t compressed; char data[]; } SDataCacheEntry; typedef struct SDataDispatchHandle { SDataSinkHandle sink; SDataSinkManager* pManager; SDataBlockSchema schema; STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t status; pthread_mutex_t mutex; } SDataDispatchHandle; static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) { if (tsCompressColData < 0 || 0 == pData->info.rows) { return false; } for (int32_t col = 0; col < pSchema->numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); int32_t colSize = pColRes->info.bytes * pData->info.rows; if (NEEDTO_COMPRESS_QUERY(colSize)) { return true; } } return false; } static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { int32_t colSize = pColRes->info.bytes * numOfRows; return (*(tDataTypes[pColRes->info.type].compFunc))( pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) { int32_t *compSizes = (int32_t*)data; if (compressed) { data += pSchema->numOfCols * sizeof(int32_t); } for (int32_t col = 0; col < pSchema->numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); if (compressed) { compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed); data += compSizes[col]; *compLen += compSizes[col]; compSizes[col] = htonl(compSizes[col]); } else { memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows); data += pColRes->info.bytes * pInput->pData->info.rows; } } int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL); while (item) { STableIdInfo* pDst = (STableIdInfo*)data; pDst->uid = htobe64(item->uid); pDst->key = htobe64(item->key); data += sizeof(STableIdInfo); item = taosHashIterate(pInput->pTableRetrieveTsMap, item); } } // data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... // data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema)); pEntry->numOfRows = pInput->pData->info.rows; pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen); pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows); // todo completed } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) { return false; } pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; pBuf->pData = malloc(pBuf->allocSize); return NULL != pBuf->pData; } static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; } static int32_t getStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); int32_t status = pDispatcher->status; pthread_mutex_unlock(&pDispatcher->mutex); return status; } static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); *pStatus = updateStatus(pDispatcher); return TSDB_CODE_SUCCESS; } static void endPut(struct SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); pDispatcher->status = DS_END; pthread_mutex_unlock(&pDispatcher->mutex); } static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; return 0; } SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; pOutput->compressed = pEntry->compressed; tfree(pDispatcher->nextOutput.pData); // todo persistent *pStatus = updateStatus(pDispatcher); return TSDB_CODE_SUCCESS; } static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; tfree(pDispatcher->nextOutput.pData); while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); tfree(pBuf->pData); taosFreeQitem(pBuf); } taosCloseQueue(pDispatcher->pDataBlocks); pthread_mutex_destroy(&pDispatcher->mutex); } int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) { SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY; } dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fGetLen = getDataLength; dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->pManager = pManager; dispatcher->schema = pDataSink->schema; dispatcher->status = DS_CAPACITY_ENOUGH; dispatcher->pDataBlocks = taosOpenQueue(); pthread_mutex_init(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY; } *pHandle = dispatcher; return TSDB_CODE_SUCCESS; }