提交 14c63859 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

:add queue support

上级 8c87d83a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCHED_H
#define TDENGINE_TSCHED_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct _sched_msg {
void *msg;
int msgLen;
int8_t type;
int32_t code;
void *handle;
} SRpcMsg;
void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label);
int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg);
void taosCleanUpMsgQueue(void *param);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCHED_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCHED_H
#define TDENGINE_TSCHED_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct _sched_msg {
void *msg;
int msgLen;
int8_t type;
int32_t code
void handle;
} SRpcMsg;
void *rpcInitMsgQueue(int queueSize, void (*fp)(int num, SRpcQueue *),const char *label);
int rpcPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg);
void rpcCleanUpMsgQueue(void *param);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCHED_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tlog.h"
#include "tqueue.h"
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
typedef struct {
char label[16];
int num;
tsem_t emptySem;
tsem_t fullSem;
pthread_mutex_t queueMutex;
int fullSlot;
int emptySlot;
int queueSize;
SRpcMsg *queue;
SRpcMsg *oqueue;
pthread_t qthread;
void (*fp)(int num, SRpcMsg *);
} SRpcQueue;
static void *taosProcessMsgQueue(void *param);
void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) {
pthread_attr_t attr;
SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue));
if (pQueue == NULL) {
pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno));
goto _error;
}
memset(pQueue, 0, sizeof(SRpcQueue));
pQueue->queueSize = queueSize;
strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow
pQueue->label[sizeof(pQueue->label)-1] = '\0';
pQueue->fp = fp;
if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) {
pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) {
pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if (tsem_init(&pQueue->fullSem, 0, 0) != 0) {
pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
pQueue->fullSlot = 0;
pQueue->fullSlot = 0;
pQueue->emptySlot = 0;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) {
pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
pTrace("%s RPC msg queue is initialized", pQueue->label);
return (void *)pQueue;
_error:
taosCleanUpMsgQueue(pQueue);
return NULL;
}
void *taosProcessMsgQueue(void *param) {
SRpcQueue *pQueue = (SRpcQueue *)param;
int num = 0;
while (1) {
if (tsem_wait(&pQueue->fullSem) != 0) {
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
pTrace("wait %s fullSem was interrupted", pQueue->label);
continue;
}
pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno));
}
if (pthread_mutex_lock(&pQueue->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
num = 0;
do {
pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot];
pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize;
++num;
pQueue->num--;
} while (pQueue->fullSlot != pQueue->emptySlot);
if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno));
for (int i= 0; i<num; ++i) {
if (tsem_post(&pQueue->emptySem) != 0)
pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno));
}
for (int i=0; i<num-1; ++i) {
if (tsem_wait(&pQueue->fullSem) != 0)
pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno));
}
(*pQueue->fp)(num, pQueue->oqueue);
}
return NULL;
}
int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) {
SRpcQueue *pQueue = (SRpcQueue *)qhandle;
if (pQueue == NULL) {
pError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
}
while (tsem_wait(&pQueue->emptySem) != 0) {
if (errno != EINTR) {
pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno));
break;
}
}
if (pthread_mutex_lock(&pQueue->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
pQueue->queue[pQueue->emptySlot] = *pMsg;
pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize;
pQueue->num++;
if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno));
return 0;
}
void taosCleanUpMsgQueue(void *param) {
SRpcQueue *pQueue = (SRpcQueue *)param;
if (pQueue == NULL) return;
pthread_cancel(pQueue->qthread);
tsem_destroy(&pQueue->emptySem);
tsem_destroy(&pQueue->fullSem);
pthread_mutex_destroy(&pQueue->queueMutex);
free(pQueue->queue);
free(pQueue);
}
......@@ -17,24 +17,35 @@
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include "tqueue.h"
#include <stdint.h>
int msgSize = 128;
int commit = 0;
int dataFd = -1;
void *qhandle = NULL;
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) {
static int num = 0;
tTrace("request is received, type:%d, contLen:%d", type, contLen);
if (dataFd >=0) {
if ( write(dataFd, pCont, contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno));
tTrace("%d shell msgs are received", numOfMsgs);
for (int i=0; i<numOfMsgs; ++i) {
if (dataFd >=0) {
if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno));
}
}
void *rsp = rpcMallocCont(msgSize);
rpcSendResponse(pMsg->handle, 1, rsp, msgSize);
rpcFreeCont(pMsg->msg);
pMsg++;
}
if (commit >=2) {
++num;
num += numOfMsgs;
if ( fsync(dataFd) < 0 ) {
tPrint("failed to flush data to file, reason:%s", strerror(errno));
}
......@@ -44,9 +55,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
}
}
void *rsp = rpcMallocCont(msgSize);
rpcSendResponse(thandle, 1, rsp, msgSize);
/*
SRpcIpSet ipSet;
......@@ -58,7 +66,17 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
rpcSendRedirectRsp(ahandle, &ipSet);
*/
rpcFreeCont(pCont);
}
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
tTrace("request is received, type:%d, contLen:%d", type, contLen);
SRpcMsg rpcMsg;
rpcMsg.msg = pCont;
rpcMsg.msgLen = contLen;
rpcMsg.code = code;
rpcMsg.handle = thandle;
rpcMsg.type = type;
taosPutIntoMsgQueue(qhandle, &rpcMsg);
}
int main(int argc, char *argv[]) {
......@@ -91,6 +109,7 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
uDebugFlag = rpcDebugFlag;
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
......@@ -125,6 +144,8 @@ int main(int argc, char *argv[]) {
tPrint("failed to open data file, reason:%s", strerror(errno));
}
qhandle = taosInitMsgQueue(1000, processShellMsg, "SER");
// loop forever
while(1) {
sleep(1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册