提交 6f02179d 编写于 作者: X Xiaoyu Wang

TD-12760 simple memory management code.

上级 0654fbfe
......@@ -87,7 +87,7 @@ typedef struct SUseDbOutput {
SDBVgroupInfo dbVgroup;
} SUseDbOutput;
typedef enum {
enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
......
......@@ -51,6 +51,7 @@ void taosFreeQitem(void *pItem);
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue);
int32_t taosQueueSize(STaosQueue *queue);
STaosQall *taosAllocateQall();
void taosFreeQall(STaosQall *qall);
......
......@@ -26,17 +26,26 @@ extern "C" {
struct SDataSink;
struct SDataSinkHandle;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen);
typedef struct SDataSinkManager {
SDataSinkMgtCfg cfg;
pthread_mutex_t mutex;
} SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle {
FPutDataBlock fPut;
FGetDataBlock fGet;
FEndPut fEndPut;
FGetDataLength fGetLen;
FGetDataBlock fGetData;
FDestroyDataSinker fDestroy;
} SDataSinkHandle;
int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDispatcher(SDataSinkManager* pManager, const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
#ifdef __cplusplus
}
......
......@@ -26,6 +26,8 @@ extern "C" {
#define DS_CAPACITY_ENOUGH 1
#define DS_CAPACITY_FULL 2
#define DS_NEED_SCHEDULE 3
#define DS_END 4
#define DS_IN_PROCESS 5
struct SDataSink;
struct SSDataBlock;
......@@ -39,11 +41,16 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle;
typedef struct SDataResult {
SQueryCostInfo profile;
typedef struct SInputData {
const SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
} SDataResult;
} SInputData;
typedef struct SOutPutData {
int32_t numOfRows;
int8_t compressed;
char* pData;
} SOutPutData;
/**
* Create a subplan's datasinker handle for all later operations.
......@@ -59,30 +66,25 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
* @param pRes
* @return error code
*/
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes);
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
void dsEndPut(DataSinkHandle handle);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle
* @return data length
*/
int32_t dsGetDataLength(DataSinkHandle handle);
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
/**
* Get data, the caller needs to allocate data memory.
* @param handle
* @param pData output
* @param pLen output
* @param pOutput output
* @param pStatus output
* @return error code
*/
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen);
/**
* Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
* @param handle
* @return datasinker status
*/
int32_t dsGetStatus(DataSinkHandle handle);
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
......
......@@ -20,21 +20,29 @@
#include "tglobal.h"
#include "tqueue.h"
#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos
#define GET_BUF_REMAIN(buf) (buf)->remain
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
typedef struct SBuf {
int32_t size;
int32_t pos;
int32_t remain;
typedef struct SDataDispatchBuf {
int32_t useSize;
int32_t allocSize;
char* pData;
} SBuf;
} SDataDispatchBuf;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int8_t compressed;
char data[];
} SDataCacheEntry;
typedef struct SDataDispatchHandle {
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockSchema schema;
STaosQueue* pDataBlocks;
SBuf buf;
SDataDispatchBuf nextOutput;
int32_t status;
pthread_mutex_t mutex;
} SDataDispatchHandle;
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
......@@ -53,87 +61,156 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSche
return false;
}
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows;
return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
int32_t *compSizes = (int32_t*)data;
if (compressed) {
data += pSchema->numOfCols * sizeof(int32_t);
}
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col);
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
if (compressed) {
compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed);
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows);
data += pColRes->info.bytes * pRes->pData->info.rows;
memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows);
data += pColRes->info.bytes * pInput->pData->info.rows;
}
}
int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap);
int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL);
STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
while (item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
item = taosHashIterate(pRes->pTableRetrieveTsMap, item);
item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
}
}
static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) {
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData;
pRsp->useconds = htobe64(pRes->profile.elapsedTime);
pRsp->precision = htons(pHandle->schema.precision);
pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema));
pRsp->numOfRows = htonl(pRes->pData->info.rows);
*pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp);
doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen);
*pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows);
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
pEntry->numOfRows = pInput->pData->info.rows;
pRsp->compLen = htonl(pRsp->compLen);
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows);
// todo completed
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) {
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) {
return false;
}
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize);
return NULL != pBuf->pData;
}
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
pDispatcher->status = status;
pthread_mutex_unlock(&pDispatcher->mutex);
return status;
}
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = pDispatcher->status;
pthread_mutex_unlock(&pDispatcher->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
int32_t useSize = 0;
toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize);
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
*pStatus = updateStatus(pDispatcher);
return TSDB_CODE_SUCCESS;
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) {
static void endPut(struct SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->status = DS_END;
pthread_mutex_unlock(&pDispatcher->mutex);
}
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
return 0;
}
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->compressed = pEntry->compressed;
tfree(pDispatcher->nextOutput.pData); // todo persistent
*pStatus = updateStatus(pDispatcher);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
tfree(pDispatcher->nextOutput.pData);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
tfree(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDispatcher->pDataBlocks);
pthread_mutex_destroy(&pDispatcher->mutex);
}
int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) {
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fGet = getDataBlock;
dispatcher->sink.fGetLen = getDataLength;
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->schema;
dispatcher->status = DS_CAPACITY_ENOUGH;
dispatcher->pDataBlocks = taosOpenQueue();
pthread_mutex_init(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;
......
......@@ -17,33 +17,38 @@
#include "dataSinkInt.h"
#include "planner.h"
static SDataSinkManager gDataSinkManager = {0};
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
// todo
gDataSinkManager.cfg = *cfg;
pthread_mutex_init(&gDataSinkManager.mutex, NULL);
}
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
if (DSINK_Dispatch == pDataSink->info.type) {
return createDataDispatcher(pDataSink, pHandle);
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
}
return TSDB_CODE_FAILED;
}
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) {
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fPut(pHandleImpl, pRes);
return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
}
int32_t dsGetDataLength(DataSinkHandle handle) {
// todo
void dsEndPut(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl);
}
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) {
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGet(pHandleImpl, pData, pLen);
return pHandleImpl->fGetLen(pHandleImpl, pStatus);
}
int32_t dsGetStatus(DataSinkHandle handle) {
// todo
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
}
void dsScheduleProcess(void* ahandle, void* pItem) {
......
......@@ -112,6 +112,13 @@ bool taosQueueEmpty(STaosQueue *queue) {
return empty;
}
int32_t taosQueueSize(STaosQueue *queue) {
pthread_mutex_lock(&queue->mutex);
int32_t numOfItems = queue->numOfItems;
pthread_mutex_unlock(&queue->mutex);
return numOfItems;
}
void *taosAllocateQitem(int32_t size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册