提交 6c134914 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 2b6ace65
...@@ -132,7 +132,6 @@ typedef struct { ...@@ -132,7 +132,6 @@ typedef struct {
// ref data block, for delete // ref data block, for delete
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t ver;
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SStreamRefDataBlock; } SStreamRefDataBlock;
...@@ -207,8 +206,6 @@ void* streamQueueNextItem(SStreamQueue* pQueue); ...@@ -207,8 +206,6 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
typedef struct { typedef struct {
char* qmsg; char* qmsg;
void* pExecutor; // not applicable to encoder and decoder void* pExecutor; // not applicable to encoder and decoder
......
...@@ -396,8 +396,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui ...@@ -396,8 +396,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs();
mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); mDebug("s-task:0x%x set time window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
// all the source tasks dispatch result to a single agg node. // all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pDownstreamTask); setFixedDownstreamEpInfo(pTask, pDownstreamTask);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "ttimer.h" #include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (50) #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define ONE_MB_F (1048576.0) #define ONE_MB_F (1048576.0)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) #define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
......
...@@ -164,26 +164,6 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm ...@@ -164,26 +164,6 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
return 0; return 0;
} }
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
int32_t len = 0;
if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
len = pSubmit->submit.msgLen;
}
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
if (pSubmitClone == NULL) {
return NULL;
}
streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone;
}
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册