提交 068697bf 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/feature/3.0_wxy' into feature/qnode

......@@ -24,11 +24,9 @@ extern "C" {
#include "executor.h"
#include "executorimpl.h"
#define DS_CAPACITY_ENOUGH 1
#define DS_DATA_FULL 2
#define DS_NEED_SCHEDULE 3
#define DS_QUERY_END 4
#define DS_IN_PROCESS 5
#define DS_BUF_LOW 1
#define DS_BUF_FULL 2
#define DS_BUF_EMPTY 3
struct SDataSink;
struct SSDataBlock;
......@@ -45,11 +43,16 @@ typedef struct SInputData {
SHashObj* pTableRetrieveTsMap;
} SInputData;
typedef struct SOutPutData {
typedef struct SOutputData {
int32_t numOfRows;
int8_t compressed;
char* pData;
} SOutPutData;
bool queryEnd;
bool needSchedule;
int32_t bufStatus;
int64_t useconds;
int8_t precision;
} SOutputData;
/**
* Create a subplan's datasinker handle for all later operations.
......@@ -65,20 +68,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
* @param pRes
* @return error code
*/
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
/**
*
* @param handle
*/
void dsEndPut(DataSinkHandle handle);
void dsEndPut(DataSinkHandle handle, int64_t useconds);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle
* @return data length
* @param pLen data length
*/
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
/**
* Get data, the caller needs to allocate data memory.
......@@ -87,7 +86,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
* @param pStatus output
* @return error code
*/
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
......
......@@ -31,10 +31,10 @@ typedef struct SDataSinkManager {
pthread_mutex_t mutex;
} SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle {
......
......@@ -42,6 +42,8 @@ typedef struct SDataDispatchHandle {
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
bool queryEnd;
int64_t useconds;
pthread_mutex_t mutex;
} SDataDispatchHandle;
......@@ -124,7 +126,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_DATA_FULL;
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDispatcher->status = status;
pthread_mutex_unlock(&pDispatcher->mutex);
return status;
......@@ -137,7 +141,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
......@@ -145,38 +149,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
*pStatus = updateStatus(pDispatcher);
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
return TSDB_CODE_SUCCESS;
}
static void endPut(struct SDataSinkHandle* pHandle) {
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->status = DS_QUERY_END;
pDispatcher->queryEnd = true;
pDispatcher->useconds = useconds;
pthread_mutex_unlock(&pDispatcher->mutex);
}
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pStatus = getStatus(pDispatcher) ? DS_QUERY_END : DS_IN_PROCESS;
return 0;
*pQueryEnd = pDispatcher->queryEnd;
*pLen = 0;
return;
}
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->compressed = pEntry->compressed;
tfree(pDispatcher->nextOutput.pData); // todo persistent
*pStatus = updateStatus(pDispatcher);
pOutput->bufStatus = updateStatus(pDispatcher);
pthread_mutex_lock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->needSchedule = false;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pthread_mutex_unlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
......@@ -205,7 +217,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->schema;
dispatcher->status = DS_CAPACITY_ENOUGH;
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue();
pthread_mutex_init(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) {
......
......@@ -32,24 +32,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
return TSDB_CODE_FAILED;
}
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
}
void dsEndPut(DataSinkHandle handle) {
void dsEndPut(DataSinkHandle handle, int64_t useconds) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl);
return pHandleImpl->fEndPut(pHandleImpl, useconds);
}
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetLen(pHandleImpl, pStatus);
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
}
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
return pHandleImpl->fGetData(pHandleImpl, pOutput);
}
void dsScheduleProcess(void* ahandle, void* pItem) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册