未验证 提交 d8743b72 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #12569 from taosdata/fix/query_limit

fix: mem size limit
...@@ -433,7 +433,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -433,7 +433,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L); tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, 1, INT64_MAX, 0) != 0) return -1; if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, INT64_MAX, 0) != 0)
return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
......
...@@ -30,6 +30,7 @@ void Testbase::InitLog(const char* path) { ...@@ -30,6 +30,7 @@ void Testbase::InitLog(const char* path) {
tsdbDebugFlag = 0; tsdbDebugFlag = 0;
tsLogEmbedded = 1; tsLogEmbedded = 1;
tsAsyncLog = 0; tsAsyncLog = 0;
tsRpcQueueMemoryAllowed = 1024 * 1024 * 64;
taosRemoveDir(path); taosRemoveDir(path);
taosMkDir(path); taosMkDir(path);
...@@ -82,7 +83,7 @@ SRpcMsg* Testbase::SendReq(tmsg_t msgType, void* pCont, int32_t contLen) { ...@@ -82,7 +83,7 @@ SRpcMsg* Testbase::SendReq(tmsg_t msgType, void* pCont, int32_t contLen) {
return client.SendReq(&rpcMsg); return client.SendReq(&rpcMsg);
} }
int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) { int32_t Testbase::SendShowReq(int8_t showType, const char* tb, const char* db) {
if (showRsp != NULL) { if (showRsp != NULL) {
rpcFreeCont(showRsp); rpcFreeCont(showRsp);
showRsp = NULL; showRsp = NULL;
......
...@@ -33,11 +33,11 @@ typedef struct STaosQnode { ...@@ -33,11 +33,11 @@ typedef struct STaosQnode {
} STaosQnode; } STaosQnode;
typedef struct STaosQueue { typedef struct STaosQueue {
STaosQnode *head; STaosQnode * head;
STaosQnode *tail; STaosQnode * tail;
STaosQueue *next; // for queue set STaosQueue * next; // for queue set
STaosQset *qset; // for queue set STaosQset * qset; // for queue set
void *ahandle; // for queue set void * ahandle; // for queue set
FItem itemFp; FItem itemFp;
FItems itemsFp; FItems itemsFp;
TdThreadMutex mutex; TdThreadMutex mutex;
...@@ -46,8 +46,8 @@ typedef struct STaosQueue { ...@@ -46,8 +46,8 @@ typedef struct STaosQueue {
} STaosQueue; } STaosQueue;
typedef struct STaosQset { typedef struct STaosQset {
STaosQueue *head; STaosQueue * head;
STaosQueue *current; STaosQueue * current;
TdThreadMutex mutex; TdThreadMutex mutex;
tsem_t sem; tsem_t sem;
int32_t numOfQueues; int32_t numOfQueues;
...@@ -85,7 +85,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) { ...@@ -85,7 +85,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
void taosCloseQueue(STaosQueue *queue) { void taosCloseQueue(STaosQueue *queue) {
if (queue == NULL) return; if (queue == NULL) return;
STaosQnode *pTemp; STaosQnode *pTemp;
STaosQset *qset; STaosQset * qset;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
STaosQnode *pNode = queue->head; STaosQnode *pNode = queue->head;
...@@ -152,7 +152,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) { ...@@ -152,7 +152,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
if (itype == RPC_QITEM) { if (itype == RPC_QITEM) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size); int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
if (alloced > tsRpcQueueMemoryUsed) { if (alloced > tsRpcQueueMemoryAllowed) {
taosMemoryFree(pNode); taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
return NULL; return NULL;
......
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
typedef struct STestMsg { typedef struct STestMsg {
uint16_t msgType; uint16_t msgType;
void *pCont; void * pCont;
int contLen; int contLen;
int32_t code; int32_t code;
void *handle; // rpc handle returned to app void * handle; // rpc handle returned to app
void *ahandle; // app handle set by client void * ahandle; // app handle set by client
int noResp; // has response or not(default 0, 0: resp, 1: no resp); int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not int persistHandle; // persist handle or not
} STestMsg; } STestMsg;
...@@ -37,7 +37,7 @@ class UtilTesProc : public ::testing::Test { ...@@ -37,7 +37,7 @@ class UtilTesProc : public ::testing::Test {
head.msgType = 2; head.msgType = 2;
head.noResp = 3; head.noResp = 3;
head.persistHandle = 4; head.persistHandle = 4;
tsRpcQueueMemoryAllowed = 1024 * 1024 * 64;
taosRemoveDir("/tmp/td"); taosRemoveDir("/tmp/td");
taosMkDir("/tmp/td"); taosMkDir("/tmp/td");
tstrncpy(tsLogDir, "/tmp/td", PATH_MAX); tstrncpy(tsLogDir, "/tmp/td", PATH_MAX);
...@@ -64,18 +64,18 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) { ...@@ -64,18 +64,18 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) {
shm.size = 1023; shm.size = 1023;
SProcCfg cfg = {(ProcConsumeFp)NULL, SProcCfg cfg = {(ProcConsumeFp)NULL,
(ProcMallocFp)taosAllocateQitem, (ProcMallocFp)taosAllocateQitem,
(ProcFreeFp)taosFreeQitem, (ProcFreeFp)taosFreeQitem,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryMalloc, (ProcFreeFp)taosMemoryMalloc,
(ProcConsumeFp)NULL, (ProcConsumeFp)NULL,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryMalloc, (ProcFreeFp)taosMemoryMalloc,
shm, shm,
&shm, &shm,
"1234"}; "1234"};
SProcObj *proc = taosProcInit(&cfg); SProcObj *proc = taosProcInit(&cfg);
ASSERT_EQ(proc, nullptr); ASSERT_EQ(proc, nullptr);
...@@ -105,18 +105,18 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ...@@ -105,18 +105,18 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
shm.size = 3000; shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0); ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0);
SProcCfg cfg = {(ProcConsumeFp)ConsumeChild1, SProcCfg cfg = {(ProcConsumeFp)ConsumeChild1,
(ProcMallocFp)taosAllocateQitem, (ProcMallocFp)taosAllocateQitem,
(ProcFreeFp)taosFreeQitem, (ProcFreeFp)taosFreeQitem,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcConsumeFp)NULL, (ProcConsumeFp)NULL,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
shm, shm,
(void *)((int64_t)1235), (void *)((int64_t)1235),
"1235_c"}; "1235_c"};
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
...@@ -163,18 +163,18 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { ...@@ -163,18 +163,18 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
shm.size = 3000; shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0); ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0);
SProcCfg cfg = {(ProcConsumeFp)NULL, SProcCfg cfg = {(ProcConsumeFp)NULL,
(ProcMallocFp)taosAllocateQitem, (ProcMallocFp)taosAllocateQitem,
(ProcFreeFp)taosFreeQitem, (ProcFreeFp)taosFreeQitem,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcConsumeFp)ConsumeParent1, (ProcConsumeFp)ConsumeParent1,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
shm, shm,
(void *)((int64_t)1236), (void *)((int64_t)1236),
"1236_c"}; "1236_c"};
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
...@@ -217,18 +217,18 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -217,18 +217,18 @@ TEST_F(UtilTesProc, 03_Handle) {
shm.size = 3000; shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0); ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0);
SProcCfg cfg = {(ProcConsumeFp)ConsumeChild3, SProcCfg cfg = {(ProcConsumeFp)ConsumeChild3,
(ProcMallocFp)taosAllocateQitem, (ProcMallocFp)taosAllocateQitem,
(ProcFreeFp)taosFreeQitem, (ProcFreeFp)taosFreeQitem,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcConsumeFp)NULL, (ProcConsumeFp)NULL,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
(ProcMallocFp)taosMemoryMalloc, (ProcMallocFp)taosMemoryMalloc,
(ProcFreeFp)taosMemoryFree, (ProcFreeFp)taosMemoryFree,
shm, shm,
(void *)((int64_t)1235), (void *)((int64_t)1235),
"1237_p"}; "1237_p"};
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
...@@ -236,7 +236,8 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -236,7 +236,8 @@ TEST_F(UtilTesProc, 03_Handle) {
int32_t i = 0; int32_t i = 0;
for (i = 0; i < 20; ++i) { for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i); head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0); ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ),
0);
} }
cfg.isChild = true; cfg.isChild = true;
...@@ -247,7 +248,7 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -247,7 +248,7 @@ TEST_F(UtilTesProc, 03_Handle) {
taosProcCleanup(pproc); taosProcCleanup(pproc);
int64_t ref = 0; int64_t ref = 0;
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)3)); ref = taosProcRemoveHandle(cproc, (void *)((int64_t)3));
EXPECT_EQ(ref, 3); EXPECT_EQ(ref, 3);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)5)); ref = taosProcRemoveHandle(cproc, (void *)((int64_t)5));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册