From 6f02179de0d18f2aa6a33220ea622ebcb1d3aeac Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 7 Jan 2022 15:02:33 -0500 Subject: [PATCH] TD-12760 simple memory management code. --- include/libs/qcom/query.h | 2 +- include/util/tqueue.h | 1 + source/libs/executor/inc/dataSinkInt.h | 17 ++- source/libs/executor/inc/dataSinkMgt.h | 32 ++--- source/libs/executor/src/dataDispatcher.c | 151 ++++++++++++++++------ source/libs/executor/src/dataSinkMgt.c | 25 ++-- source/util/src/tqueue.c | 7 + 7 files changed, 168 insertions(+), 67 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index e1eef1c3f5..0d5792fd91 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -87,7 +87,7 @@ typedef struct SUseDbOutput { SDBVgroupInfo dbVgroup; } SUseDbOutput; -typedef enum { +enum { META_TYPE_NON_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, diff --git a/include/util/tqueue.h b/include/util/tqueue.h index a57bdb5ce8..63ba460d39 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -51,6 +51,7 @@ void taosFreeQitem(void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); +int32_t taosQueueSize(STaosQueue *queue); STaosQall *taosAllocateQall(); void taosFreeQall(STaosQall *qall); diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 3f0b150c8e..1bbf5494dd 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -26,17 +26,26 @@ extern "C" { struct SDataSink; struct SDataSinkHandle; -typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes); -typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen); +typedef struct SDataSinkManager { + SDataSinkMgtCfg cfg; + pthread_mutex_t mutex; +} SDataSinkManager; + +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle); +typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef struct SDataSinkHandle { FPutDataBlock fPut; - FGetDataBlock fGet; + FEndPut fEndPut; + FGetDataLength fGetLen; + FGetDataBlock fGetData; FDestroyDataSinker fDestroy; } SDataSinkHandle; -int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle); +int32_t createDataDispatcher(SDataSinkManager* pManager, const struct SDataSink* pDataSink, DataSinkHandle* pHandle); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/dataSinkMgt.h b/source/libs/executor/inc/dataSinkMgt.h index fab5958107..d13423b25d 100644 --- a/source/libs/executor/inc/dataSinkMgt.h +++ b/source/libs/executor/inc/dataSinkMgt.h @@ -26,6 +26,8 @@ extern "C" { #define DS_CAPACITY_ENOUGH 1 #define DS_CAPACITY_FULL 2 #define DS_NEED_SCHEDULE 3 +#define DS_END 4 +#define DS_IN_PROCESS 5 struct SDataSink; struct SSDataBlock; @@ -39,11 +41,16 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); typedef void* DataSinkHandle; -typedef struct SDataResult { - SQueryCostInfo profile; +typedef struct SInputData { const SSDataBlock* pData; SHashObj* pTableRetrieveTsMap; -} SDataResult; +} SInputData; + +typedef struct SOutPutData { + int32_t numOfRows; + int8_t compressed; + char* pData; +} SOutPutData; /** * Create a subplan's datasinker handle for all later operations. @@ -59,30 +66,25 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH * @param pRes * @return error code */ -int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes); +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); + +void dsEndPut(DataSinkHandle handle); /** * Get the length of the data returned by the next call to dsGetDataBlock. * @param handle * @return data length */ -int32_t dsGetDataLength(DataSinkHandle handle); +int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); /** * Get data, the caller needs to allocate data memory. * @param handle - * @param pData output - * @param pLen output + * @param pOutput output + * @param pStatus output * @return error code */ -int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen); - -/** - * Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call. - * @param handle - * @return datasinker status - */ -int32_t dsGetStatus(DataSinkHandle handle); +int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus); /** * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index b2c135e96d..3d8e51d04d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -20,21 +20,29 @@ #include "tglobal.h" #include "tqueue.h" -#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos -#define GET_BUF_REMAIN(buf) (buf)->remain +#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) -typedef struct SBuf { - int32_t size; - int32_t pos; - int32_t remain; +typedef struct SDataDispatchBuf { + int32_t useSize; + int32_t allocSize; char* pData; -} SBuf; +} SDataDispatchBuf; + +typedef struct SDataCacheEntry { + int32_t dataLen; + int32_t numOfRows; + int8_t compressed; + char data[]; +} SDataCacheEntry; typedef struct SDataDispatchHandle { SDataSinkHandle sink; + SDataSinkManager* pManager; SDataBlockSchema schema; STaosQueue* pDataBlocks; - SBuf buf; + SDataDispatchBuf nextOutput; + int32_t status; + pthread_mutex_t mutex; } SDataDispatchHandle; static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) { @@ -53,87 +61,156 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSche return false; } -static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { +static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { int32_t colSize = pColRes->info.bytes * numOfRows; return (*(tDataTypes[pColRes->info.type].compFunc))( pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) { +static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) { int32_t *compSizes = (int32_t*)data; if (compressed) { data += pSchema->numOfCols * sizeof(int32_t); } for (int32_t col = 0; col < pSchema->numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col); + SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed); + compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed); data += compSizes[col]; *compLen += compSizes[col]; compSizes[col] = htonl(compSizes[col]); } else { - memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows); - data += pColRes->info.bytes * pRes->pData->info.rows; + memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows); + data += pColRes->info.bytes * pInput->pData->info.rows; } } - int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap); + int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); - STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL); + STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL); while (item) { STableIdInfo* pDst = (STableIdInfo*)data; pDst->uid = htobe64(item->uid); pDst->key = htobe64(item->key); data += sizeof(STableIdInfo); - item = taosHashIterate(pRes->pTableRetrieveTsMap, item); + item = taosHashIterate(pInput->pTableRetrieveTsMap, item); } } -static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) { - SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData; - pRsp->useconds = htobe64(pRes->profile.elapsedTime); - pRsp->precision = htons(pHandle->schema.precision); - pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema)); - pRsp->numOfRows = htonl(pRes->pData->info.rows); - - *pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp); - doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen); - *pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows); +// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... +// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... +static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { + SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; + pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema)); + pEntry->numOfRows = pInput->pData->info.rows; - pRsp->compLen = htonl(pRsp->compLen); + pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); + copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen); + pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows); // todo completed } -static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) { +static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { + if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) { + return false; + } + pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; + pBuf->pData = malloc(pBuf->allocSize); + return NULL != pBuf->pData; +} + +static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { + pthread_mutex_lock(&pDispatcher->mutex); + int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; + pDispatcher->status = status; + pthread_mutex_unlock(&pDispatcher->mutex); + return status; +} + +static int32_t getStatus(SDataDispatchHandle* pDispatcher) { + pthread_mutex_lock(&pDispatcher->mutex); + int32_t status = pDispatcher->status; + pthread_mutex_unlock(&pDispatcher->mutex); + return status; +} + +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - int32_t useSize = 0; - toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize); + SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); + if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + toDataCacheEntry(pDispatcher, pInput, pBuf); + taosWriteQitem(pDispatcher->pDataBlocks, pBuf); + *pStatus = updateStatus(pDispatcher); + return TSDB_CODE_SUCCESS; } -static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) { +static void endPut(struct SDataSinkHandle* pHandle) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + pthread_mutex_lock(&pDispatcher->mutex); + pDispatcher->status = DS_END; + pthread_mutex_unlock(&pDispatcher->mutex); +} +static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + if (taosQueueEmpty(pDispatcher->pDataBlocks)) { + *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; + return 0; + } + SDataDispatchBuf* pBuf = NULL; + taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); + memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); + taosFreeQitem(pBuf); + return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; } -static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { +static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); + memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); + pOutput->numOfRows = pEntry->numOfRows; + pOutput->compressed = pEntry->compressed; + tfree(pDispatcher->nextOutput.pData); // todo persistent + *pStatus = updateStatus(pDispatcher); + return TSDB_CODE_SUCCESS; +} +static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + tfree(pDispatcher->nextOutput.pData); + while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { + SDataDispatchBuf* pBuf = NULL; + taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); + tfree(pBuf->pData); + taosFreeQitem(pBuf); + } + taosCloseQueue(pDispatcher->pDataBlocks); + pthread_mutex_destroy(&pDispatcher->mutex); } -int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) { +int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) { SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return TSDB_CODE_QRY_OUT_OF_MEMORY; } dispatcher->sink.fPut = putDataBlock; - dispatcher->sink.fGet = getDataBlock; + dispatcher->sink.fGetLen = getDataLength; + dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; + dispatcher->pManager = pManager; + dispatcher->schema = pDataSink->schema; + dispatcher->status = DS_CAPACITY_ENOUGH; dispatcher->pDataBlocks = taosOpenQueue(); + pthread_mutex_init(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return TSDB_CODE_QRY_OUT_OF_MEMORY; } *pHandle = dispatcher; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 2193babc76..8a96c5d05f 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -17,33 +17,38 @@ #include "dataSinkInt.h" #include "planner.h" +static SDataSinkManager gDataSinkManager = {0}; + int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { - // todo + gDataSinkManager.cfg = *cfg; + pthread_mutex_init(&gDataSinkManager.mutex, NULL); } int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) { if (DSINK_Dispatch == pDataSink->info.type) { - return createDataDispatcher(pDataSink, pHandle); + return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); } return TSDB_CODE_FAILED; } -int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) { +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fPut(pHandleImpl, pRes); + return pHandleImpl->fPut(pHandleImpl, pInput, pStatus); } -int32_t dsGetDataLength(DataSinkHandle handle) { - // todo +void dsEndPut(DataSinkHandle handle) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fEndPut(pHandleImpl); } -int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) { +int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGet(pHandleImpl, pData, pLen); + return pHandleImpl->fGetLen(pHandleImpl, pStatus); } -int32_t dsGetStatus(DataSinkHandle handle) { - // todo +int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus); } void dsScheduleProcess(void* ahandle, void* pItem) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 75f5e9cdbc..5cb149d53c 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -112,6 +112,13 @@ bool taosQueueEmpty(STaosQueue *queue) { return empty; } +int32_t taosQueueSize(STaosQueue *queue) { + pthread_mutex_lock(&queue->mutex); + int32_t numOfItems = queue->numOfItems; + pthread_mutex_unlock(&queue->mutex); + return numOfItems; +} + void *taosAllocateQitem(int32_t size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); -- GitLab