提交 f19fdaa1 编写于 作者: dengyihao's avatar dengyihao

opt trans

上级 b894ba6f
...@@ -115,8 +115,9 @@ typedef struct SRpcInit { ...@@ -115,8 +115,9 @@ typedef struct SRpcInit {
int32_t connLimitNum; int32_t connLimitNum;
int32_t connLimitLock; int32_t connLimitLock;
int8_t supportBatch; // 0: no batch, 1. batch int8_t supportBatch; // 0: no batch, 1. batch
void *parent; int32_t batchSize;
void *parent;
} SRpcInit; } SRpcInit;
typedef struct { typedef struct {
......
...@@ -286,11 +286,12 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -286,11 +286,12 @@ int32_t dmInitClient(SDnode *pDnode) {
int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 100); connLimitNum = TMAX(connLimitNum, 100);
connLimitNum = TMIN(connLimitNum, 600); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1; rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 64 * 1024;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
......
...@@ -67,6 +67,7 @@ typedef struct { ...@@ -67,6 +67,7 @@ typedef struct {
int32_t connLimitNum; int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize;
int index; int index;
void* parent; void* parent;
......
...@@ -70,6 +70,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -70,6 +70,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connLimitNum = pInit->connLimitNum; pRpc->connLimitNum = pInit->connLimitNum;
pRpc->connLimitLock = pInit->connLimitLock; pRpc->connLimitLock = pInit->connLimitLock;
pRpc->supportBatch = pInit->supportBatch; pRpc->supportBatch = pInit->supportBatch;
pRpc->batchSize = pInit->batchSize;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) { if (pRpc->numOfThreads <= 0) {
......
...@@ -27,6 +27,7 @@ typedef struct { ...@@ -27,6 +27,7 @@ typedef struct {
int connMax; int connMax;
int connCnt; int connCnt;
int batchLenLimit; int batchLenLimit;
int sending;
char* dst; char* dst;
char* ip; char* ip;
...@@ -992,6 +993,8 @@ static void cliDestroyBatch(SCliBatch* pBatch) { ...@@ -992,6 +993,8 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
destroyCmsg(p); destroyCmsg(p);
} }
SCliBatchList* p = pBatch->pList;
p->sending -= 1;
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
...@@ -1461,11 +1464,12 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1461,11 +1464,12 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
} }
} }
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt >= pList->connMax) { if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
return NULL; return NULL;
} }
queue* hr = QUEUE_HEAD(&pList->wq); queue* hr = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(hr); QUEUE_REMOVE(hr);
pList->sending += 1;
pList->len -= 1; pList->len -= 1;
...@@ -1474,6 +1478,8 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { ...@@ -1474,6 +1478,8 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
} }
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
STrans* pInst = pThrd->pTransInst;
int count = 0; int count = 0;
while (!QUEUE_IS_EMPTY(wq)) { while (!QUEUE_IS_EMPTY(wq)) {
queue* h = QUEUE_HEAD(wq); queue* h = QUEUE_HEAD(wq);
...@@ -1493,9 +1499,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1493,9 +1499,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
if (ppBatchList == NULL || *ppBatchList == NULL) { if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
QUEUE_INIT(&pBatchList->wq); QUEUE_INIT(&pBatchList->wq);
pBatchList->connMax = 200; pBatchList->connMax = pInst->connLimitNum;
pBatchList->connCnt = 0; pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 16 * 1024; pBatchList->batchLenLimit = pInst->batchSize;
pBatchList->ip = strdup(ip); pBatchList->ip = strdup(ip);
pBatchList->dst = strdup(key); pBatchList->dst = strdup(key);
pBatchList->port = port; pBatchList->port = port;
......
...@@ -114,8 +114,9 @@ int main(int argc, char *argv[]) { ...@@ -114,8 +114,9 @@ int main(int argc, char *argv[]) {
rpcInit.user = "michael"; rpcInit.user = "michael";
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.connLimitNum = 300; rpcInit.connLimitNum = 10;
rpcInit.connLimitLock = 1; rpcInit.connLimitLock = 1;
rpcInit.batchSize = 16 * 1024;
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcDebugFlag = 135; rpcDebugFlag = 135;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册