提交 183e444e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

change tqueue

上级 87cdd051
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
int32_t dnodeInitWrite(); int32_t dnodeInitWrite();
void dnodeCleanupWrite(); void dnodeCleanupWrite();
void dnodeWrite(void *pMsg); void dnodeWrite(void *pMsg);
void * dnodeAllocateWriteWorker(); void * dnodeAllocateWriteWorker(void *pVnode);
void dnodeFreeWriteWorker(void *worker); void dnodeFreeWriteWorker(void *worker);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -152,8 +152,8 @@ static int32_t dnodeOpenVnode(int32_t vgId) { ...@@ -152,8 +152,8 @@ static int32_t dnodeOpenVnode(int32_t vgId) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker(); vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj);
vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj);
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb; vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL; vnodeObj.replica = NULL;
...@@ -217,8 +217,8 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -217,8 +217,8 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker(); vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj);
vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj);
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb; vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL; vnodeObj.replica = NULL;
......
...@@ -33,16 +33,15 @@ typedef struct { ...@@ -33,16 +33,15 @@ typedef struct {
void *pCont; void *pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
void *pVnode;
SRpcContext *pRpcContext; // RPC message context SRpcContext *pRpcContext; // RPC message context
} SReadMsg; } SReadMsg;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(SReadMsg *pRead); static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
static void dnodeHandleIdleReadWorker(); static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(SReadMsg *pMsg); static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg); static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
// module global variable // module global variable
static taos_qset readQset; static taos_qset readQset;
...@@ -93,15 +92,14 @@ void dnodeRead(void *rpcMsg) { ...@@ -93,15 +92,14 @@ void dnodeRead(void *rpcMsg) {
} }
// put message into queue // put message into queue
SReadMsg readMsg; SReadMsg *pReadMsg = taosAllocateQitem(sizeof(SReadMsg));
readMsg.rpcMsg = *pMsg; pReadMsg->rpcMsg = *pMsg;
readMsg.pCont = pCont; pReadMsg->pCont = pCont;
readMsg.contLen = contLen; pReadMsg->contLen = contLen;
readMsg.pRpcContext = pRpcContext; pReadMsg->pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
taos_queue queue = dnodeGetVnodeRworker(pVnode); taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, &readMsg); taosWriteQitem(queue, 0, pReadMsg);
// next vnode // next vnode
leftLen -= contLen; leftLen -= contLen;
...@@ -111,11 +109,11 @@ void dnodeRead(void *rpcMsg) { ...@@ -111,11 +109,11 @@ void dnodeRead(void *rpcMsg) {
} }
} }
void *dnodeAllocateReadWorker() { void *dnodeAllocateReadWorker(void *pVnode) {
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
if (queue == NULL) return NULL; if (queue == NULL) return NULL;
taosAddIntoQset(readQset, queue); taosAddIntoQset(readQset, queue, pVnode);
// spawn a thread to process queue // spawn a thread to process queue
if (threads < maxThreads) { if (threads < maxThreads) {
...@@ -140,22 +138,25 @@ void dnodeFreeReadWorker(void *rqueue) { ...@@ -140,22 +138,25 @@ void dnodeFreeReadWorker(void *rqueue) {
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
taos_qset qset = (taos_qset)param; taos_qset qset = (taos_qset)param;
SReadMsg readMsg; SReadMsg *pReadMsg;
int type;
void *pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(qset, &readMsg) <= 0) { if (taosReadQitemFromQset(qset, &type, &pReadMsg, &pVnode) == 0) {
dnodeHandleIdleReadWorker(); dnodeHandleIdleReadWorker();
continue; continue;
} }
terrno = 0; terrno = 0;
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) { if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg); (*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
dnodeProcessReadResult(&readMsg); dnodeProcessReadResult(pVnode, pReadMsg);
taosFreeQitem(pReadMsg);
} }
return NULL; return NULL;
...@@ -173,11 +174,11 @@ static void dnodeHandleIdleReadWorker() { ...@@ -173,11 +174,11 @@ static void dnodeHandleIdleReadWorker() {
} }
} }
static void dnodeProcessReadResult(SReadMsg *pRead) { static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext; SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0; int32_t code = 0;
dnodeReleaseVnode(pRead->pVnode); dnodeReleaseVnode(pVnode);
if (pRpcContext) { if (pRpcContext) {
if (terrno) { if (terrno) {
...@@ -204,10 +205,10 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { ...@@ -204,10 +205,10 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
} }
static void dnodeProcessQueryMsg(SReadMsg *pMsg) { static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
} }
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
} }
...@@ -33,7 +33,6 @@ typedef struct _write { ...@@ -33,7 +33,6 @@ typedef struct _write {
void *pCont; void *pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
void *pVnode; // pointer to vnode
SRpcContext *pRpcContext; // RPC message context SRpcContext *pRpcContext; // RPC message context
} SWriteMsg; } SWriteMsg;
...@@ -49,20 +48,20 @@ typedef struct _thread_obj { ...@@ -49,20 +48,20 @@ typedef struct _thread_obj {
SWriteWorker *writeWorker; SWriteWorker *writeWorker;
} SWriteWorkerPool; } SWriteWorkerPool;
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *); static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker); static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
static void dnodeProcessWriteResult(SWriteMsg *pWrite); static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite);
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg); static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg); static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg);
SWriteWorkerPool wWorkerPool; SWriteWorkerPool wWorkerPool;
int32_t dnodeInitWrite() { int32_t dnodeInitWrite() {
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg;
wWorkerPool.max = tsNumOfCores; wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
...@@ -107,15 +106,14 @@ void dnodeWrite(void *rpcMsg) { ...@@ -107,15 +106,14 @@ void dnodeWrite(void *rpcMsg) {
} }
// put message into queue // put message into queue
SWriteMsg writeMsg; SWriteMsg *pWriteMsg = taosAllocateQitem(sizeof(SWriteMsg));
writeMsg.rpcMsg = *pMsg; pWriteMsg->rpcMsg = *pMsg;
writeMsg.pCont = pCont; pWriteMsg->pCont = pCont;
writeMsg.contLen = contLen; pWriteMsg->contLen = contLen;
writeMsg.pRpcContext = pRpcContext; pWriteMsg->pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
taos_queue queue = dnodeGetVnodeWworker(pVnode); taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg); taosWriteQitem(queue, 0, pWriteMsg);
// next vnode // next vnode
leftLen -= contLen; leftLen -= contLen;
...@@ -123,7 +121,7 @@ void dnodeWrite(void *rpcMsg) { ...@@ -123,7 +121,7 @@ void dnodeWrite(void *rpcMsg) {
} }
} }
void *dnodeAllocateWriteWorker() { void *dnodeAllocateWriteWorker(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
...@@ -140,9 +138,9 @@ void *dnodeAllocateWriteWorker() { ...@@ -140,9 +138,9 @@ void *dnodeAllocateWriteWorker() {
} }
} }
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); taos_queue *queue = taosOpenQueue();
if (queue) { if (queue) {
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
...@@ -158,11 +156,13 @@ void dnodeFreeWriteWorker(void *wqueue) { ...@@ -158,11 +156,13 @@ void dnodeFreeWriteWorker(void *wqueue) {
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SWriteWorker *pWorker = (SWriteWorker *)param;
taos_qall qall; taos_qall qall;
SWriteMsg writeMsg; SWriteMsg *pWriteMsg;
int32_t numOfMsgs; int32_t numOfMsgs;
int type;
void *pVnode;
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall, &pVnode);
if (numOfMsgs <=0) { if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue; continue;
...@@ -170,7 +170,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -170,7 +170,7 @@ static void *dnodeProcessWriteQueue(void *param) {
for (int32_t i=0; i<numOfMsgs; ++i) { for (int32_t i=0; i<numOfMsgs; ++i) {
// retrieve all items, and write them into WAL // retrieve all items, and write them into WAL
taosGetQitem(qall, &writeMsg); taosGetQitem(qall, &type, &pWriteMsg);
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); // walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
} }
...@@ -181,16 +181,16 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -181,16 +181,16 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(qall); taosResetQitems(qall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, &writeMsg); taosGetQitem(qall, &type, &pWriteMsg);
terrno = 0; terrno = 0;
if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) { if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) {
(*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg); (*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
dnodeProcessWriteResult(&writeMsg); dnodeProcessWriteResult(pVnode, pWriteMsg);
} }
// free the Qitems; // free the Qitems;
...@@ -200,11 +200,11 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -200,11 +200,11 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL; return NULL;
} }
static void dnodeProcessWriteResult(SWriteMsg *pWrite) { static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) {
SRpcContext *pRpcContext = pWrite->pRpcContext; SRpcContext *pRpcContext = pWrite->pRpcContext;
int32_t code = 0; int32_t code = 0;
dnodeReleaseVnode(pWrite->pVnode); dnodeReleaseVnode(pVnode);
if (pRpcContext) { if (pRpcContext) {
if (terrno) { if (terrno) {
...@@ -244,14 +244,14 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { ...@@ -244,14 +244,14 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
} }
} }
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { static void dnodeProcessSubmitMsg(void *param, SWriteMsg *pMsg) {
} }
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { static void dnodeProcessCreateTableMsg(void *param, SWriteMsg *pMsg) {
} }
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { static void dnodeProcessDropTableMsg(void *param, SWriteMsg *pMsg) {
} }
...@@ -78,7 +78,7 @@ void taosResetLogFile(); ...@@ -78,7 +78,7 @@ void taosResetLogFile();
// utility log function // utility log function
#define pError(...) \ #define pError(...) \
if (uDebugFlag & DEBUG_ERROR) { \ if (uDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR UTL ", 255, __VA_ARGS__); \ tprintf("ERROR UTL ", uDebugFlag, __VA_ARGS__); \
} }
#define pWarn(...) \ #define pWarn(...) \
if (uDebugFlag & DEBUG_WARN) { \ if (uDebugFlag & DEBUG_WARN) { \
......
...@@ -24,24 +24,26 @@ typedef void* taos_queue; ...@@ -24,24 +24,26 @@ typedef void* taos_queue;
typedef void* taos_qset; typedef void* taos_qset;
typedef void* taos_qall; typedef void* taos_qall;
taos_queue taosOpenQueue(int itemSize); taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue); void taosCloseQueue(taos_queue);
int taosWriteQitem(taos_queue, void *item); void *taosAllocateQitem(int size);
int taosReadQitem(taos_queue, void *item); void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem);
int taosReadAllQitems(taos_queue, taos_qall *); int taosReadAllQitems(taos_queue, taos_qall *);
int taosGetQitem(taos_qall, void *item); int taosGetQitem(taos_qall, int *type, void **pitem);
void taosResetQitems(taos_qall); void taosResetQitems(taos_qall);
void taosFreeQitems(taos_qall); void taosFreeQitems(taos_qall);
taos_qset taosOpenQset(); taos_qset taosOpenQset();
void taosCloseQset(); void taosCloseQset();
int taosAddIntoQset(taos_qset, taos_queue); int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue); void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset); int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, void *item); int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
int taosReadAllQitemsFromQset(taos_qset, taos_qall *); int taosReadAllQitemsFromQset(taos_qset, taos_qall *, void **handle);
int taosGetQueueItemsNumber(taos_queue param); int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param); int taosGetQsetItemsNumber(taos_qset param);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tqueue.h" #include "tqueue.h"
typedef struct _taos_qnode { typedef struct _taos_qnode {
int type;
struct _taos_qnode *next; struct _taos_qnode *next;
char item[]; char item[];
} STaosQnode; } STaosQnode;
...@@ -30,6 +31,7 @@ typedef struct _taos_q { ...@@ -30,6 +31,7 @@ typedef struct _taos_q {
struct _taos_qnode *tail; struct _taos_qnode *tail;
struct _taos_q *next; // for queue set struct _taos_q *next; // for queue set
struct _taos_qset *qset; // for queue set struct _taos_qset *qset; // for queue set
void *ahandle; // for queue set
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
...@@ -48,7 +50,7 @@ typedef struct _taos_qall { ...@@ -48,7 +50,7 @@ typedef struct _taos_qall {
int32_t numOfItems; int32_t numOfItems;
} STaosQall; } STaosQall;
taos_queue taosOpenQueue(int itemSize) { taos_queue taosOpenQueue() {
STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1); STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
if (queue == NULL) { if (queue == NULL) {
...@@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) { ...@@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) {
} }
pthread_mutex_init(&queue->mutex, NULL); pthread_mutex_init(&queue->mutex, NULL);
queue->itemSize = (int32_t)itemSize;
return queue; return queue;
} }
...@@ -83,16 +83,24 @@ void taosCloseQueue(taos_queue param) { ...@@ -83,16 +83,24 @@ void taosCloseQueue(taos_queue param) {
free(queue); free(queue);
} }
int taosWriteQitem(taos_queue param, void *item) { void *taosAllocateQitem(int size) {
STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
if (pNode == NULL) return NULL;
return (void *)pNode->item;
}
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1); void taosFreeQitem(void *param) {
if ( pNode == NULL ) { if (param == NULL) return;
terrno = TSDB_CODE_NO_RESOURCE;
return -1; char *temp = (char *)param;
} temp -= sizeof(STaosQnode);
free(temp);
}
memcpy(pNode->item, item, queue->itemSize); int taosWriteQitem(taos_queue param, int type, void *item) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = (STaosQnode *)((char *)item - sizeof(STaosQnode));
pNode->type = type;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
...@@ -112,7 +120,7 @@ int taosWriteQitem(taos_queue param, void *item) { ...@@ -112,7 +120,7 @@ int taosWriteQitem(taos_queue param, void *item) {
return 0; return 0;
} }
int taosReadQitem(taos_queue param, void *item) { int taosReadQitem(taos_queue param, int *type, void **pitem) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
...@@ -121,11 +129,11 @@ int taosReadQitem(taos_queue param, void *item) { ...@@ -121,11 +129,11 @@ int taosReadQitem(taos_queue param, void *item) {
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize); *pitem = pNode->item;
*type = pNode->type;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) if (queue->head == NULL)
queue->tail = NULL; queue->tail = NULL;
free(pNode);
queue->numOfItems--; queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
...@@ -168,7 +176,7 @@ int taosReadAllQitems(taos_queue param, taos_qall *res) { ...@@ -168,7 +176,7 @@ int taosReadAllQitems(taos_queue param, taos_qall *res) {
return code; return code;
} }
int taosGetQitem(taos_qall param, void *item) { int taosGetQitem(taos_qall param, int *type, void **pitem) {
STaosQall *qall = (STaosQall *)param; STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode; STaosQnode *pNode;
int num = 0; int num = 0;
...@@ -178,7 +186,7 @@ int taosGetQitem(taos_qall param, void *item) { ...@@ -178,7 +186,7 @@ int taosGetQitem(taos_qall param, void *item) {
qall->current = pNode->next; qall->current = pNode->next;
if (pNode) { if (pNode) {
memcpy(item, pNode->item, qall->itemSize); *pitem = pNode->item;
num = 1; num = 1;
} }
...@@ -221,7 +229,7 @@ void taosCloseQset(taos_qset param) { ...@@ -221,7 +229,7 @@ void taosCloseQset(taos_qset param) {
free(qset); free(qset);
} }
int taosAddIntoQset(taos_qset p1, taos_queue p2) { int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2; STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1; STaosQset *qset = (STaosQset *)p1;
...@@ -230,6 +238,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) { ...@@ -230,6 +238,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) {
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
queue->next = qset->head; queue->next = qset->head;
queue->ahandle = ahandle;
qset->head = queue; qset->head = queue;
qset->numOfQueues++; qset->numOfQueues++;
...@@ -283,7 +292,7 @@ int taosGetQueueNumber(taos_qset param) { ...@@ -283,7 +292,7 @@ int taosGetQueueNumber(taos_qset param) {
return ((STaosQset *)param)->numOfQueues; return ((STaosQset *)param)->numOfQueues;
} }
int taosReadQitemFromQset(taos_qset param, void *item) { int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
...@@ -301,11 +310,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) { ...@@ -301,11 +310,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize); *pitem = pNode->item;
*type = pNode->type;
*phandle = queue->ahandle;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) if (queue->head == NULL)
queue->tail = NULL; queue->tail = NULL;
free(pNode);
queue->numOfItems--; queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
...@@ -318,7 +328,7 @@ int taosReadQitemFromQset(taos_qset param, void *item) { ...@@ -318,7 +328,7 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
return code; return code;
} }
int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res, void **phandle) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQueue *queue; STaosQueue *queue;
STaosQall *qall = NULL; STaosQall *qall = NULL;
...@@ -346,6 +356,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { ...@@ -346,6 +356,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize; qall->itemSize = queue->itemSize;
code = qall->numOfItems; code = qall->numOfItems;
*phandle = queue->ahandle;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
......
...@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int ret; int ret;
pTrace("open tcp client socket:%s:%d", destIp, destPort); pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp);
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册