提交 628bb62c 编写于 作者: dengyihao's avatar dengyihao

add backpressure

上级 6ee01802
......@@ -131,3 +131,4 @@ tools/BUGS
tools/taos-tools
tools/taosws-rs
tags
.clangd
......@@ -204,7 +204,7 @@ typedef struct {
int32_t streamInit();
void streamCleanUp();
SStreamQueue* streamQueueOpen();
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* queue);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
......@@ -374,7 +374,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
int32_t code = 0;
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
......@@ -385,19 +386,20 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
code = taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
code = taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
code = taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
code = taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}
if (code != 0) return code;
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
......@@ -637,9 +639,9 @@ typedef struct SStreamMeta {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
// SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
......
......@@ -761,11 +761,15 @@ int32_t* taosGetErrno();
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
#define TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE TAOS_DEF_ERROR_CODE(0, 0x4101)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x5101)
// UTIL
#define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000)
#ifdef __cplusplus
}
#endif
......
......@@ -84,6 +84,8 @@ typedef struct STaosQueue {
int64_t memOfItems;
int32_t numOfItems;
int64_t threadId;
int64_t memLimit;
int64_t itemLimit;
} STaosQueue;
typedef struct STaosQset {
......@@ -106,12 +108,14 @@ void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize);
void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue);
void taosUpdateItemSize(STaosQueue *queue, int32_t items);
int32_t taosQueueItemSize(STaosQueue *queue);
int64_t taosQueueMemorySize(STaosQueue *queue);
void taosSetQueueCapacity(STaosQueue *queue, int64_t size);
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t mem);
STaosQall *taosAllocateQall();
void taosFreeQall(STaosQall *qall);
......
......@@ -66,8 +66,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
pTask->inputQueue = streamQueueOpen(0);
pTask->outputQueue = streamQueueOpen(0);
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
return -1;
......
......@@ -934,8 +934,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
pTask->inputQueue = streamQueueOpen(128 << 10);
pTask->outputQueue = streamQueueOpen(128 << 10);
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
return -1;
......
......@@ -125,6 +125,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
int32_t code = 0;
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0);
if (NULL == pBuf) {
......@@ -137,7 +138,10 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
code = taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
if (code != 0) {
return code;
}
int32_t status = updateStatus(pDispatcher);
*pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY);
......
......@@ -188,6 +188,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
}
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0;
if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
......@@ -198,7 +199,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
taosWriteQitem(pTask->outputQueue->queue, pBlock);
code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) {
return code;
}
streamDispatch(pTask);
}
return 0;
......
......@@ -20,7 +20,7 @@
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code;
void* exec = pTask->exec.executor;
while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
while (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
qError("stream task wait for the end of fill history");
taosMsleep(2);
continue;
......@@ -105,8 +105,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
}
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
int32_t code = 0;
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.executor;
qSetStreamOpOpen(exec);
......@@ -164,7 +165,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
streamTaskOutput(pTask, qRes);
code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosFreeQitem(pRes);
return code;
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("task %d scan exec dispatch block num %d", pTask->taskId, batchCnt);
......@@ -214,6 +219,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
#endif
int32_t streamExecForAll(SStreamTask* pTask) {
int32_t code = 0;
while (1) {
int32_t batchCnt = 1;
void* input = NULL;
......@@ -256,7 +262,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, input);
code = streamTaskOutput(pTask, input);
if (code != 0) {
// backpressure and record position
}
continue;
}
......@@ -286,7 +295,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qRes->sourceVer = pMerged->ver;
}
if (streamTaskOutput(pTask, qRes) < 0) {
code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
// backpressure and record position
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(input);
taosFreeQitem(qRes);
......
......@@ -15,7 +15,7 @@
#include "streamInc.h"
SStreamQueue* streamQueueOpen() {
SStreamQueue* streamQueueOpen(int64_t cap) {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
if (pQueue == NULL) return NULL;
pQueue->queue = taosOpenQueue();
......@@ -24,6 +24,8 @@ SStreamQueue* streamQueueOpen() {
goto FAIL;
}
pQueue->status = STREAM_QUEUE__SUCESS;
taosSetQueueCapacity(pQueue->queue, cap);
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
return pQueue;
FAIL:
if (pQueue->queue) taosCloseQueue(pQueue->queue);
......
......@@ -632,11 +632,16 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE,"Out of memory in stream queue")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
// UTIL
TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory")
#ifdef TAOS_ERROR_C
};
#endif
......
......@@ -21,6 +21,9 @@
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
STaosQueue *taosOpenQueue() {
STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
if (queue == NULL) {
......@@ -153,11 +156,26 @@ void taosFreeQitem(void *pItem) {
taosMemoryFree(pNode);
}
void taosWriteQitem(STaosQueue *queue, void *pItem) {
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
int32_t code = 0;
STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
pNode->next = NULL;
taosThreadMutexLock(&queue->mutex);
if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) {
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
queue->memLimit, tstrerror(code));
taosThreadMutexUnlock(&queue->mutex);
return code;
} else if (queue->itemLimit > 0 && queue->itemLimit + 1 > queue->itemLimit) {
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
queue->itemLimit, tstrerror(code));
taosThreadMutexUnlock(&queue->mutex);
return code;
}
if (queue->tail) {
queue->tail->next = pNode;
......@@ -166,15 +184,16 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) {
queue->head = pNode;
queue->tail = pNode;
}
queue->numOfItems++;
queue->memOfItems += pNode->size;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
taosThreadMutexUnlock(&queue->mutex);
if (queue->qset) tsem_post(&queue->qset->sem);
return code;
}
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册