提交 eabc5511 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

change the code to accomodate the changes by tqueue

上级 fc68ac02
...@@ -90,7 +90,7 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -90,7 +90,7 @@ void dnodeRead(SRpcMsg *pMsg) {
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1;//htonl(pHead->vgId); pHead->vgId = 1; //htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId); void *pVnode = dnodeGetVnode(pHead->vgId);
...@@ -101,22 +101,19 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -101,22 +101,19 @@ void dnodeRead(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SReadMsg readMsg; SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
readMsg.rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
readMsg.pCont = pCont; pRead->pCont = pCont;
readMsg.contLen = pHead->contLen; pRead->contLen = pHead->contLen;
readMsg.pRpcContext = pRpcContext; pRead->pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
taos_queue queue = dnodeGetVnodeRworker(pVnode); taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, 0, pReadMsg); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// next vnode // next vnode
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
queuedMsgNum++; queuedMsgNum++;
dnodeReleaseVnode(pVnode);
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
...@@ -179,6 +176,8 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -179,6 +176,8 @@ static void *dnodeProcessReadQueue(void *param) {
dnodeProcessReadResult(pVnode, pReadMsg); dnodeProcessReadResult(pVnode, pReadMsg);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
dnodeReleaseVnode(pVnode);
} }
return NULL; return NULL;
......
...@@ -115,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -115,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SWriteMsg writeMsg; SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
writeMsg.rpcMsg = *pMsg; pWrite->rpcMsg = *pMsg;
writeMsg.pCont = pCont; pWrite->pCont = pCont;
writeMsg.contLen = pHead->contLen; pWrite->contLen = pHead->contLen;
writeMsg.pRpcContext = pRpcContext; pWrite->pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
taos_queue queue = dnodeGetVnodeWworker(pVnode); taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg); taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
// next vnode // next vnode
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
...@@ -145,14 +144,14 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -145,14 +144,14 @@ void dnodeWrite(SRpcMsg *pMsg) {
void *dnodeAllocateWriteWorker(void *pVnode) { void *dnodeAllocateWriteWorker(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); taos_queue *queue = taosOpenQueue();
if (queue == NULL) return NULL; if (queue == NULL) return NULL;
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL; if (pWorker->qset == NULL) return NULL;
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -164,7 +163,7 @@ void *dnodeAllocateWriteWorker(void *pVnode) { ...@@ -164,7 +163,7 @@ void *dnodeAllocateWriteWorker(void *pVnode) {
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
} }
} else { } else {
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
...@@ -185,8 +184,10 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -185,8 +184,10 @@ static void *dnodeProcessWriteQueue(void *param) {
int type; int type;
void *pVnode; void *pVnode;
qall = taosAllocateQall();
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode);
if (numOfMsgs <=0) { if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue; continue;
...@@ -215,12 +216,13 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -215,12 +216,13 @@ static void *dnodeProcessWriteQueue(void *param) {
} }
dnodeProcessWriteResult(pVnode, pWriteMsg); dnodeProcessWriteResult(pVnode, pWriteMsg);
taosFreeQitem(pWriteMsg);
} }
// free the Qitems;
taosFreeQitems(qall);
} }
taosFreeQall(qall);
return NULL; return NULL;
} }
......
...@@ -28,10 +28,13 @@ void *qhandle = NULL; ...@@ -28,10 +28,13 @@ void *qhandle = NULL;
void processShellMsg() { void processShellMsg() {
static int num = 0; static int num = 0;
taos_qall qall; taos_qall qall;
SRpcMsg rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, &qall); int numOfMsgs = taosReadAllQitems(qhandle, qall);
if (numOfMsgs <= 0) { if (numOfMsgs <= 0) {
usleep(1000); usleep(1000);
continue; continue;
...@@ -40,10 +43,10 @@ void processShellMsg() { ...@@ -40,10 +43,10 @@ void processShellMsg() {
tTrace("%d shell msgs are received", numOfMsgs); tTrace("%d shell msgs are received", numOfMsgs);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg); taosGetQitem(qall, &type, (void **)&pRpcMsg);
if (dataFd >=0) { if (dataFd >=0) {
if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno)); tPrint("failed to write data file, reason:%s", strerror(errno));
} }
} }
...@@ -62,19 +65,22 @@ void processShellMsg() { ...@@ -62,19 +65,22 @@ void processShellMsg() {
taosResetQitems(qall); taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
rpcFreeCont(rpcMsg.pCont); taosGetQitem(qall, &type, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = rpcMsg.handle; rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 1; rpcMsg.code = 1;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
} }
taosFreeQitems(qall);
} }
taosFreeQall(qall);
/* /*
SRpcIpSet ipSet; SRpcIpSet ipSet;
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
...@@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char ...@@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
} }
void processRequestMsg(SRpcMsg *pMsg) { void processRequestMsg(SRpcMsg *pMsg) {
tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); SRpcMsg *pTemp;
taosWriteQitem(qhandle, pMsg);
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
...@@ -143,6 +154,7 @@ int main(int argc, char *argv[]) { ...@@ -143,6 +154,7 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]); commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
ddebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag;
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
extern "C" { extern "C" {
#endif #endif
#define TAOS_QTYPE_RPC 0
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
typedef void* taos_queue; typedef void* taos_queue;
typedef void* taos_qset; typedef void* taos_qset;
typedef void* taos_qall; typedef void* taos_qall;
...@@ -31,10 +35,11 @@ void taosFreeQitem(void *item); ...@@ -31,10 +35,11 @@ void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item); int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem); int taosReadQitem(taos_queue, int *type, void **pitem);
int taosReadAllQitems(taos_queue, taos_qall *); taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
int taosReadAllQitems(taos_queue, taos_qall);
int taosGetQitem(taos_qall, int *type, void **pitem); int taosGetQitem(taos_qall, int *type, void **pitem);
void taosResetQitems(taos_qall); void taosResetQitems(taos_qall);
void taosFreeQitems(taos_qall);
taos_qset taosOpenQset(); taos_qset taosOpenQset();
void taosCloseQset(); void taosCloseQset();
...@@ -43,7 +48,7 @@ void taosRemoveFromQset(taos_qset, taos_queue); ...@@ -43,7 +48,7 @@ void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset); int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle); int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
int taosReadAllQitemsFromQset(taos_qset, taos_qall *, void **handle); int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
int taosGetQueueItemsNumber(taos_queue param); int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param); int taosGetQsetItemsNumber(taos_qset param);
......
...@@ -92,6 +92,8 @@ void *taosAllocateQitem(int size) { ...@@ -92,6 +92,8 @@ void *taosAllocateQitem(int size) {
void taosFreeQitem(void *param) { void taosFreeQitem(void *param) {
if (param == NULL) return; if (param == NULL) return;
//pTrace("item:%p is freed", param);
char *temp = (char *)param; char *temp = (char *)param;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
free(temp); free(temp);
...@@ -115,6 +117,8 @@ int taosWriteQitem(taos_queue param, int type, void *item) { ...@@ -115,6 +117,8 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
//pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
return 0; return 0;
...@@ -137,6 +141,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { ...@@ -137,6 +141,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
queue->numOfItems--; queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
//pTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
...@@ -144,35 +149,38 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { ...@@ -144,35 +149,38 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
return code; return code;
} }
int taosReadAllQitems(taos_queue param, taos_qall *res) { void *taosAllocateQall() {
void *p = malloc(sizeof(STaosQall));
return p;
}
void taosFreeQall(void *param) {
free(param);
}
int taosReadAllQitems(taos_queue param, taos_qall p2) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQall *qall = NULL; STaosQall *qall = (STaosQall *)p2;
int code = 0; int code = 0;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1); memset(qall, 0, sizeof(STaosQall));
if ( qall == NULL ) { qall->current = queue->head;
terrno = TSDB_CODE_NO_RESOURCE; qall->start = queue->head;
code = -1; qall->numOfItems = queue->numOfItems;
} else { qall->itemSize = queue->itemSize;
qall->current = queue->head; code = qall->numOfItems;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems; queue->head = NULL;
qall->itemSize = queue->itemSize; queue->tail = NULL;
code = qall->numOfItems; queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
}
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
*res = qall;
return code; return code;
} }
...@@ -189,6 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { ...@@ -189,6 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) {
*pitem = pNode->item; *pitem = pNode->item;
*type = pNode->type; *type = pNode->type;
num = 1; num = 1;
//pTrace("item:%p is fetched", *pitem);
} }
return num; return num;
...@@ -199,19 +208,6 @@ void taosResetQitems(taos_qall param) { ...@@ -199,19 +208,6 @@ void taosResetQitems(taos_qall param) {
qall->current = qall->start; qall->current = qall->start;
} }
void taosFreeQitems(taos_qall param) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode;
while (qall->current) {
pNode = qall->current;
qall->current = pNode->next;
free(pNode);
}
free(qall);
}
taos_qset taosOpenQset() { taos_qset taosOpenQset() {
STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1); STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
...@@ -329,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand ...@@ -329,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
return code; return code;
} }
int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQueue *queue; STaosQueue *queue;
STaosQall *qall = NULL; STaosQall *qall = (STaosQall *)p2;
int code = 0; int code = 0;
for(int i=0; i<qset->numOfQueues; ++i) { for(int i=0; i<qset->numOfQueues; ++i) {
...@@ -347,23 +343,17 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { ...@@ -347,23 +343,17 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) {
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1); qall->current = queue->head;
if (qall == NULL) { qall->start = queue->head;
terrno = TSDB_CODE_NO_RESOURCE; qall->numOfItems = queue->numOfItems;
code = -1; qall->itemSize = queue->itemSize;
} else { code = qall->numOfItems;
qall->current = queue->head; *phandle = queue->ahandle;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
*phandle = queue->ahandle;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
}
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
...@@ -371,8 +361,6 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) { ...@@ -371,8 +361,6 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) {
if (code != 0) break; if (code != 0) break;
} }
*res = qall;
return code; return code;
} }
......
...@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int ret; int ret;
pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp); // pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp);
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
...@@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) { if (ret != 0) {
pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); //pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
sockFd = -1; sockFd = -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册