diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 70c612775d857b06162319e77874f9c30e89ffbe..9e3cfbb0b0ae50b2718e52f6291ae65471c1757f 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -50,8 +50,10 @@ struct SQueryStmtInfo; typedef SSchema SSlotSchema; typedef struct SDataBlockSchema { - SSlotSchema *pSchema; - int32_t numOfCols; // number of columns + SSlotSchema *pSchema; + int32_t numOfCols; // number of columns + int32_t resultRowSize; + int16_t precision; } SDataBlockSchema; typedef struct SQueryNodeBasicInfo { @@ -61,6 +63,7 @@ typedef struct SQueryNodeBasicInfo { typedef struct SDataSink { SQueryNodeBasicInfo info; + SDataBlockSchema schema; } SDataSink; typedef struct SDataDispatcher { diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index a6f70b9e8340a81130fb677412d35f4fe9b85e33..ba941ab22d58d7dd8755898bad083aa666d15e65 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,5 +8,5 @@ target_include_directories( target_link_libraries( executor - PRIVATE os util common function parser + PRIVATE os util common function parser planner qcom ) \ No newline at end of file diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h new file mode 100644 index 0000000000000000000000000000000000000000..3f0b150c8e9f72735930f12e549bc0ecf188c016 --- /dev/null +++ b/source/libs/executor/inc/dataSinkInt.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _DATA_SINK_INT_H +#define _DATA_SINK_INT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common.h" +#include "dataSinkMgt.h" + +struct SDataSink; +struct SDataSinkHandle; + +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen); +typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); + +typedef struct SDataSinkHandle { + FPutDataBlock fPut; + FGetDataBlock fGet; + FDestroyDataSinker fDestroy; +} SDataSinkHandle; + +int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle); + +#ifdef __cplusplus +} +#endif + +#endif /*_DATA_SINK_INT_H*/ diff --git a/source/libs/executor/inc/dataSinkMgt.h b/source/libs/executor/inc/dataSinkMgt.h new file mode 100644 index 0000000000000000000000000000000000000000..fab59581078a943556213dfa7ba1c90b4a0651fc --- /dev/null +++ b/source/libs/executor/inc/dataSinkMgt.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _DATA_SINK_MGT_H +#define _DATA_SINK_MGT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "executorimpl.h" + +#define DS_CAPACITY_ENOUGH 1 +#define DS_CAPACITY_FULL 2 +#define DS_NEED_SCHEDULE 3 + +struct SDataSink; +struct SSDataBlock; + +typedef struct SDataSinkMgtCfg { + uint32_t maxDataBlockNum; + uint32_t maxDataBlockNumPerQuery; +} SDataSinkMgtCfg; + +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); + +typedef void* DataSinkHandle; + +typedef struct SDataResult { + SQueryCostInfo profile; + const SSDataBlock* pData; + SHashObj* pTableRetrieveTsMap; +} SDataResult; + +/** + * Create a subplan's datasinker handle for all later operations. + * @param pDataSink + * @param pHandle output + * @return error code + */ +int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle); + +/** + * Put the result set returned by the executor into datasinker. + * @param handle + * @param pRes + * @return error code + */ +int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes); + +/** + * Get the length of the data returned by the next call to dsGetDataBlock. + * @param handle + * @return data length + */ +int32_t dsGetDataLength(DataSinkHandle handle); + +/** + * Get data, the caller needs to allocate data memory. + * @param handle + * @param pData output + * @param pLen output + * @return error code + */ +int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen); + +/** + * Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call. + * @param handle + * @return datasinker status + */ +int32_t dsGetStatus(DataSinkHandle handle); + +/** + * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. + * @param ahandle + * @param pItem + */ +void dsScheduleProcess(void* ahandle, void* pItem); + +/** + * Destroy the datasinker handle. + * @param handle + */ +void dsDestroyDataSinker(DataSinkHandle handle); + +#ifdef __cplusplus +} +#endif + +#endif /*_DATA_SINK_MGT_H*/ diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c new file mode 100644 index 0000000000000000000000000000000000000000..b2c135e96df9a52d8e5a0fb22f89f855a1e5a7bd --- /dev/null +++ b/source/libs/executor/src/dataDispatcher.c @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "dataSinkInt.h" +#include "dataSinkMgt.h" +#include "planner.h" +#include "tcompression.h" +#include "tglobal.h" +#include "tqueue.h" + +#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos +#define GET_BUF_REMAIN(buf) (buf)->remain + +typedef struct SBuf { + int32_t size; + int32_t pos; + int32_t remain; + char* pData; +} SBuf; + +typedef struct SDataDispatchHandle { + SDataSinkHandle sink; + SDataBlockSchema schema; + STaosQueue* pDataBlocks; + SBuf buf; +} SDataDispatchHandle; + +static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) { + if (tsCompressColData < 0 || 0 == pData->info.rows) { + return false; + } + + for (int32_t col = 0; col < pSchema->numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); + int32_t colSize = pColRes->info.bytes * pData->info.rows; + if (NEEDTO_COMPRESS_QUERY(colSize)) { + return true; + } + } + + return false; +} + +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { + int32_t colSize = pColRes->info.bytes * numOfRows; + return (*(tDataTypes[pColRes->info.type].compFunc))( + pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); +} + +static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) { + int32_t *compSizes = (int32_t*)data; + if (compressed) { + data += pSchema->numOfCols * sizeof(int32_t); + } + + for (int32_t col = 0; col < pSchema->numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col); + if (compressed) { + compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed); + data += compSizes[col]; + *compLen += compSizes[col]; + compSizes[col] = htonl(compSizes[col]); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows); + data += pColRes->info.bytes * pRes->pData->info.rows; + } + } + + int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap); + *(int32_t*)data = htonl(numOfTables); + data += sizeof(int32_t); + + STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL); + while (item) { + STableIdInfo* pDst = (STableIdInfo*)data; + pDst->uid = htobe64(item->uid); + pDst->key = htobe64(item->key); + data += sizeof(STableIdInfo); + item = taosHashIterate(pRes->pTableRetrieveTsMap, item); + } +} + +static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) { + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData; + pRsp->useconds = htobe64(pRes->profile.elapsedTime); + pRsp->precision = htons(pHandle->schema.precision); + pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema)); + pRsp->numOfRows = htonl(pRes->pData->info.rows); + + *pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp); + doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen); + *pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows); + + pRsp->compLen = htonl(pRsp->compLen); + // todo completed +} + +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + int32_t useSize = 0; + toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize); +} + +static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) { + +} + +static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { + +} + +int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) { + SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle)); + if (NULL == dispatcher) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + dispatcher->sink.fPut = putDataBlock; + dispatcher->sink.fGet = getDataBlock; + dispatcher->sink.fDestroy = destroyDataSinker; + dispatcher->pDataBlocks = taosOpenQueue(); + if (NULL == dispatcher->pDataBlocks) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + *pHandle = dispatcher; + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c new file mode 100644 index 0000000000000000000000000000000000000000..2193babc76fba4ab5192e7dbc08a688097804a5e --- /dev/null +++ b/source/libs/executor/src/dataSinkMgt.c @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "dataSinkMgt.h" +#include "dataSinkInt.h" +#include "planner.h" + +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { + // todo +} + +int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) { + if (DSINK_Dispatch == pDataSink->info.type) { + return createDataDispatcher(pDataSink, pHandle); + } + return TSDB_CODE_FAILED; +} + +int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fPut(pHandleImpl, pRes); +} + +int32_t dsGetDataLength(DataSinkHandle handle) { + // todo +} + +int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fGet(pHandleImpl, pData, pLen); +} + +int32_t dsGetStatus(DataSinkHandle handle) { + // todo +} + +void dsScheduleProcess(void* ahandle, void* pItem) { + // todo +} + +void dsDestroyDataSinker(DataSinkHandle handle) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + pHandleImpl->fDestroy(pHandleImpl); +} diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 6ab53b7cd42821b4d4a10f3a7e871c94700c42ea..04c287baf1ff9317882d0ec259cf93e1785596a4 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -624,7 +624,7 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) { if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return TSDB_CODE_TSC_OUT_OF_MEMORY; } *pInfo = context.pOutput; @@ -637,5 +637,5 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) { } destroyInsertParseContext(&context); terrno = code; - return (TSDB_CODE_SUCCESS == code ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); + return code; }