未验证 提交 5f5c48c5 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9725 from taosdata/feature/3.0_wxy

TD-12678 datasink interface adjust
...@@ -21,13 +21,11 @@ extern "C" { ...@@ -21,13 +21,11 @@ extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "executorimpl.h" #include "thash.h"
#define DS_CAPACITY_ENOUGH 1 #define DS_BUF_LOW 1
#define DS_CAPACITY_FULL 2 #define DS_BUF_FULL 2
#define DS_NEED_SCHEDULE 3 #define DS_BUF_EMPTY 3
#define DS_END 4
#define DS_IN_PROCESS 5
struct SDataSink; struct SDataSink;
struct SSDataBlock; struct SSDataBlock;
...@@ -42,15 +40,20 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); ...@@ -42,15 +40,20 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
typedef struct SInputData { typedef struct SInputData {
const SSDataBlock* pData; const struct SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
} SInputData; } SInputData;
typedef struct SOutPutData { typedef struct SOutputData {
int32_t numOfRows; int32_t numOfRows;
int8_t compressed; int8_t compressed;
char* pData; char* pData;
} SOutPutData; bool queryEnd;
bool needSchedule;
int32_t bufStatus;
int64_t useconds;
int8_t precision;
} SOutputData;
/** /**
* Create a subplan's datasinker handle for all later operations. * Create a subplan's datasinker handle for all later operations.
...@@ -66,16 +69,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH ...@@ -66,16 +69,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
* @param pRes * @param pRes
* @return error code * @return error code
*/ */
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
void dsEndPut(DataSinkHandle handle); void dsEndPut(DataSinkHandle handle, int64_t useconds);
/** /**
* Get the length of the data returned by the next call to dsGetDataBlock. * Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle * @param handle
* @return data length * @param pLen data length
*/ */
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
/** /**
* Get data, the caller needs to allocate data memory. * Get data, the caller needs to allocate data memory.
...@@ -84,7 +87,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); ...@@ -84,7 +87,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
* @param pStatus output * @param pStatus output
* @return error code * @return error code
*/ */
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus); int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput);
/** /**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
......
...@@ -31,10 +31,10 @@ typedef struct SDataSinkManager { ...@@ -31,10 +31,10 @@ typedef struct SDataSinkManager {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SDataSinkManager; } SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus); typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle { typedef struct SDataSinkHandle {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tcompression.h" #include "tcompression.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "executorimpl.h"
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) #define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
...@@ -42,6 +43,8 @@ typedef struct SDataDispatchHandle { ...@@ -42,6 +43,8 @@ typedef struct SDataDispatchHandle {
STaosQueue* pDataBlocks; STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput; SDataDispatchBuf nextOutput;
int32_t status; int32_t status;
bool queryEnd;
int64_t useconds;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SDataDispatchHandle; } SDataDispatchHandle;
...@@ -124,7 +127,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, ...@@ -124,7 +127,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex); pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDispatcher->status = status; pDispatcher->status = status;
pthread_mutex_unlock(&pDispatcher->mutex); pthread_mutex_unlock(&pDispatcher->mutex);
return status; return status;
...@@ -137,7 +142,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { ...@@ -137,7 +142,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
return status; return status;
} }
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
...@@ -145,38 +150,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, ...@@ -145,38 +150,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
} }
toDataCacheEntry(pDispatcher, pInput, pBuf); toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
*pStatus = updateStatus(pDispatcher); *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void endPut(struct SDataSinkHandle* pHandle) { static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex); pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->status = DS_END; pDispatcher->queryEnd = true;
pDispatcher->useconds = useconds;
pthread_mutex_unlock(&pDispatcher->mutex); pthread_mutex_unlock(&pDispatcher->mutex);
} }
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) { if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; *pQueryEnd = pDispatcher->queryEnd;
return 0; *pLen = 0;
return;
} }
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfRows = pEntry->numOfRows;
pOutput->compressed = pEntry->compressed; pOutput->compressed = pEntry->compressed;
tfree(pDispatcher->nextOutput.pData); // todo persistent tfree(pDispatcher->nextOutput.pData); // todo persistent
*pStatus = updateStatus(pDispatcher); pOutput->bufStatus = updateStatus(pDispatcher);
pthread_mutex_lock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->needSchedule = false;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pthread_mutex_unlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -205,7 +218,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS ...@@ -205,7 +218,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS
dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager; dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->schema; dispatcher->schema = pDataSink->schema;
dispatcher->status = DS_CAPACITY_ENOUGH; dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue(); dispatcher->pDataBlocks = taosOpenQueue();
pthread_mutex_init(&dispatcher->mutex, NULL); pthread_mutex_init(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) { if (NULL == dispatcher->pDataBlocks) {
......
...@@ -31,24 +31,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH ...@@ -31,24 +31,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) { int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fPut(pHandleImpl, pInput, pStatus); return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
} }
void dsEndPut(DataSinkHandle handle) { void dsEndPut(DataSinkHandle handle, int64_t useconds) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl); return pHandleImpl->fEndPut(pHandleImpl, useconds);
} }
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) { void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetLen(pHandleImpl, pStatus); pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
} }
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) { int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus); return pHandleImpl->fGetData(pHandleImpl, pOutput);
} }
void dsScheduleProcess(void* ahandle, void* pItem) { void dsScheduleProcess(void* ahandle, void* pItem) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册