提交 fa9a4099 编写于 作者: S slguan

Merge branch 'refact/sync' into refact/slguan

......@@ -225,8 +225,8 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj);
vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj);
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
......@@ -314,8 +314,8 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj);
vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj);
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
......
......@@ -36,16 +36,15 @@ typedef struct {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
void *pVnode;
SRpcContext *pRpcContext; // RPC message context
} SReadMsg;
static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(SReadMsg *pRead);
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
// module global variable
static taos_qset readQset;
......@@ -93,7 +92,7 @@ void dnodeRead(SRpcMsg *pMsg) {
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1;//htonl(pHead->vgId);
pHead->vgId = 1; //htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId);
......@@ -104,22 +103,19 @@ void dnodeRead(SRpcMsg *pMsg) {
}
// put message into queue
SReadMsg readMsg;
readMsg.rpcMsg = *pMsg;
readMsg.pCont = pCont;
readMsg.contLen = pHead->contLen;
readMsg.pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = *pMsg;
pRead->pCont = pCont;
pRead->contLen = pHead->contLen;
pRead->pRpcContext = pRpcContext;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, &readMsg);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// next vnode
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
queuedMsgNum++;
dnodeReleaseVnode(pVnode);
}
if (queuedMsgNum == 0) {
......@@ -134,11 +130,11 @@ void dnodeRead(SRpcMsg *pMsg) {
}
}
void *dnodeAllocateReadWorker() {
void *dnodeAllocateReadWorker(void *pVnode) {
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
if (queue == NULL) return NULL;
taosAddIntoQset(readQset, queue);
taosAddIntoQset(readQset, queue, pVnode);
// spawn a thread to process queue
if (threads < maxThreads) {
......@@ -163,22 +159,27 @@ void dnodeFreeReadWorker(void *rqueue) {
static void *dnodeProcessReadQueue(void *param) {
taos_qset qset = (taos_qset)param;
SReadMsg readMsg;
SReadMsg *pReadMsg;
int type;
void *pVnode;
while (1) {
if (taosReadQitemFromQset(qset, &readMsg) <= 0) {
if (taosReadQitemFromQset(qset, &type, &pReadMsg, &pVnode) == 0) {
dnodeHandleIdleReadWorker();
continue;
}
terrno = 0;
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg);
if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
dnodeProcessReadResult(&readMsg);
dnodeProcessReadResult(pVnode, pReadMsg);
taosFreeQitem(pReadMsg);
dnodeReleaseVnode(pVnode);
}
return NULL;
......@@ -196,11 +197,11 @@ static void dnodeHandleIdleReadWorker() {
}
}
static void dnodeProcessReadResult(SReadMsg *pRead) {
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0;
dnodeReleaseVnode(pRead->pVnode);
dnodeReleaseVnode(pVnode);
if (pRpcContext) {
if (terrno) {
......
......@@ -35,7 +35,6 @@ typedef struct _write {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
void *pVnode; // pointer to vnode
SRpcContext *pRpcContext; // RPC message context
} SWriteMsg;
......@@ -51,7 +50,7 @@ typedef struct _thread_obj {
SWriteWorker *writeWorker;
} SWriteWorkerPool;
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *);
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
static void dnodeProcessWriteResult(SWriteMsg *pWrite);
......@@ -116,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) {
}
// put message into queue
SWriteMsg writeMsg;
writeMsg.rpcMsg = *pMsg;
writeMsg.pCont = pCont;
writeMsg.contLen = pHead->contLen;
writeMsg.pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
pWrite->pRpcContext = pRpcContext;
taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
// next vnode
leftLen -= pHead->contLen;
......@@ -144,16 +142,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
}
}
void *dnodeAllocateWriteWorker() {
void *dnodeAllocateWriteWorker(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
taos_queue *queue = taosOpenQueue();
if (queue == NULL) return NULL;
if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL;
taosAddIntoQset(pWorker->qset, queue);
taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
pthread_attr_t thAttr;
......@@ -165,7 +163,7 @@ void *dnodeAllocateWriteWorker() {
taosCloseQset(pWorker->qset);
}
} else {
taosAddIntoQset(pWorker->qset, queue);
taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
}
......@@ -181,11 +179,15 @@ void dnodeFreeWriteWorker(void *wqueue) {
static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param;
taos_qall qall;
SWriteMsg writeMsg;
SWriteMsg *pWriteMsg;
int32_t numOfMsgs;
int type;
void *pVnode;
qall = taosAllocateQall();
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall);
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode);
if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue;
......@@ -193,7 +195,7 @@ static void *dnodeProcessWriteQueue(void *param) {
for (int32_t i=0; i<numOfMsgs; ++i) {
// retrieve all items, and write them into WAL
taosGetQitem(qall, &writeMsg);
taosGetQitem(qall, &type, &pWriteMsg);
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
}
......@@ -204,30 +206,31 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems(qall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, &writeMsg);
taosGetQitem(qall, &type, &pWriteMsg);
terrno = 0;
if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) {
(*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg);
if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) {
(*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
dnodeProcessWriteResult(&writeMsg);
dnodeProcessWriteResult(pVnode, pWriteMsg);
taosFreeQitem(pWriteMsg);
}
// free the Qitems;
taosFreeQitems(qall);
}
taosFreeQall(qall);
return NULL;
}
static void dnodeProcessWriteResult(SWriteMsg *pWrite) {
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) {
SRpcContext *pRpcContext = pWrite->pRpcContext;
int32_t code = 0;
dnodeReleaseVnode(pWrite->pVnode);
dnodeReleaseVnode(pVnode);
if (pRpcContext) {
if (terrno) {
......
......@@ -28,10 +28,13 @@ void *qhandle = NULL;
void processShellMsg() {
static int num = 0;
taos_qall qall;
SRpcMsg rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, &qall);
int numOfMsgs = taosReadAllQitems(qhandle, qall);
if (numOfMsgs <= 0) {
usleep(1000);
continue;
......@@ -40,10 +43,10 @@ void processShellMsg() {
tTrace("%d shell msgs are received", numOfMsgs);
for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
taosGetQitem(qall, &type, (void **)&pRpcMsg);
if (dataFd >=0) {
if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) {
if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno));
}
}
......@@ -62,19 +65,22 @@ void processShellMsg() {
taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
rpcFreeCont(rpcMsg.pCont);
taosGetQitem(qall, &type, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = rpcMsg.handle;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 1;
rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
}
taosFreeQitems(qall);
}
taosFreeQall(qall);
/*
SRpcIpSet ipSet;
ipSet.numOfIps = 1;
......@@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
}
void processRequestMsg(SRpcMsg *pMsg) {
tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen);
taosWriteQitem(qhandle, pMsg);
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
}
int main(int argc, char *argv[]) {
......@@ -143,6 +154,7 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
ddebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag;
} else {
printf("\nusage: %s [options] \n", argv[0]);
......
......@@ -78,7 +78,7 @@ void taosResetLogFile();
// utility log function
#define pError(...) \
if (uDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR UTL ", 255, __VA_ARGS__); \
tprintf("ERROR UTL ", uDebugFlag, __VA_ARGS__); \
}
#define pWarn(...) \
if (uDebugFlag & DEBUG_WARN) { \
......
......@@ -20,28 +20,35 @@
extern "C" {
#endif
#define TAOS_QTYPE_RPC 0
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
typedef void* taos_queue;
typedef void* taos_qset;
typedef void* taos_qall;
taos_queue taosOpenQueue(int itemSize);
taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue);
int taosWriteQitem(taos_queue, void *item);
int taosReadQitem(taos_queue, void *item);
int taosReadAllQitems(taos_queue, taos_qall *);
int taosGetQitem(taos_qall, void *item);
void *taosAllocateQitem(int size);
void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem);
taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
int taosReadAllQitems(taos_queue, taos_qall);
int taosGetQitem(taos_qall, int *type, void **pitem);
void taosResetQitems(taos_qall);
void taosFreeQitems(taos_qall);
taos_qset taosOpenQset();
void taosCloseQset();
int taosAddIntoQset(taos_qset, taos_queue);
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, void *item);
int taosReadAllQitemsFromQset(taos_qset, taos_qall *);
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param);
......
......@@ -19,6 +19,7 @@
#include "tqueue.h"
typedef struct _taos_qnode {
int type;
struct _taos_qnode *next;
char item[];
} STaosQnode;
......@@ -30,6 +31,7 @@ typedef struct _taos_q {
struct _taos_qnode *tail;
struct _taos_q *next; // for queue set
struct _taos_qset *qset; // for queue set
void *ahandle; // for queue set
pthread_mutex_t mutex;
} STaosQueue;
......@@ -48,7 +50,7 @@ typedef struct _taos_qall {
int32_t numOfItems;
} STaosQall;
taos_queue taosOpenQueue(int itemSize) {
taos_queue taosOpenQueue() {
STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
if (queue == NULL) {
......@@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) {
}
pthread_mutex_init(&queue->mutex, NULL);
queue->itemSize = (int32_t)itemSize;
return queue;
}
......@@ -83,16 +83,26 @@ void taosCloseQueue(taos_queue param) {
free(queue);
}
int taosWriteQitem(taos_queue param, void *item) {
STaosQueue *queue = (STaosQueue *)param;
void *taosAllocateQitem(int size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
if (pNode == NULL) return NULL;
return (void *)pNode->item;
}
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1);
if ( pNode == NULL ) {
terrno = TSDB_CODE_NO_RESOURCE;
return -1;
}
void taosFreeQitem(void *param) {
if (param == NULL) return;
memcpy(pNode->item, item, queue->itemSize);
//pTrace("item:%p is freed", param);
char *temp = (char *)param;
temp -= sizeof(STaosQnode);
free(temp);
}
int taosWriteQitem(taos_queue param, int type, void *item) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
pNode->type = type;
pthread_mutex_lock(&queue->mutex);
......@@ -107,12 +117,14 @@ int taosWriteQitem(taos_queue param, void *item) {
queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
//pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex);
return 0;
}
int taosReadQitem(taos_queue param, void *item) {
int taosReadQitem(taos_queue param, int *type, void **pitem) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL;
int code = 0;
......@@ -121,14 +133,15 @@ int taosReadQitem(taos_queue param, void *item) {
if (queue->head) {
pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize);
*pitem = pNode->item;
*type = pNode->type;
queue->head = pNode->next;
if (queue->head == NULL)
queue->tail = NULL;
free(pNode);
queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1;
//pTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
}
pthread_mutex_unlock(&queue->mutex);
......@@ -136,39 +149,42 @@ int taosReadQitem(taos_queue param, void *item) {
return code;
}
int taosReadAllQitems(taos_queue param, taos_qall *res) {
void *taosAllocateQall() {
void *p = malloc(sizeof(STaosQall));
return p;
}
void taosFreeQall(void *param) {
free(param);
}
int taosReadAllQitems(taos_queue param, taos_qall p2) {
STaosQueue *queue = (STaosQueue *)param;
STaosQall *qall = NULL;
STaosQall *qall = (STaosQall *)p2;
int code = 0;
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
if ( qall == NULL ) {
terrno = TSDB_CODE_NO_RESOURCE;
code = -1;
} else {
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
}
memset(qall, 0, sizeof(STaosQall));
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
}
pthread_mutex_unlock(&queue->mutex);
*res = qall;
return code;
}
int taosGetQitem(taos_qall param, void *item) {
int taosGetQitem(taos_qall param, int *type, void **pitem) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode;
int num = 0;
......@@ -178,8 +194,10 @@ int taosGetQitem(taos_qall param, void *item) {
qall->current = pNode->next;
if (pNode) {
memcpy(item, pNode->item, qall->itemSize);
*pitem = pNode->item;
*type = pNode->type;
num = 1;
//pTrace("item:%p is fetched", *pitem);
}
return num;
......@@ -190,19 +208,6 @@ void taosResetQitems(taos_qall param) {
qall->current = qall->start;
}
void taosFreeQitems(taos_qall param) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode;
while (qall->current) {
pNode = qall->current;
qall->current = pNode->next;
free(pNode);
}
free(qall);
}
taos_qset taosOpenQset() {
STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
......@@ -221,7 +226,7 @@ void taosCloseQset(taos_qset param) {
free(qset);
}
int taosAddIntoQset(taos_qset p1, taos_queue p2) {
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1;
......@@ -230,6 +235,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) {
pthread_mutex_lock(&qset->mutex);
queue->next = qset->head;
queue->ahandle = ahandle;
qset->head = queue;
qset->numOfQueues++;
......@@ -283,7 +289,7 @@ int taosGetQueueNumber(taos_qset param) {
return ((STaosQset *)param)->numOfQueues;
}
int taosReadQitemFromQset(taos_qset param, void *item) {
int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) {
STaosQset *qset = (STaosQset *)param;
STaosQnode *pNode = NULL;
int code = 0;
......@@ -301,11 +307,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
if (queue->head) {
pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize);
*pitem = pNode->item;
*type = pNode->type;
*phandle = queue->ahandle;
queue->head = pNode->next;
if (queue->head == NULL)
queue->tail = NULL;
free(pNode);
queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
......@@ -318,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
return code;
}
int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
STaosQset *qset = (STaosQset *)param;
STaosQueue *queue;
STaosQall *qall = NULL;
STaosQall *qall = (STaosQall *)p2;
int code = 0;
for(int i=0; i<qset->numOfQueues; ++i) {
......@@ -336,22 +343,17 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
if (qall == NULL) {
terrno = TSDB_CODE_NO_RESOURCE;
code = -1;
} else {
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
*phandle = queue->ahandle;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
}
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
}
pthread_mutex_unlock(&queue->mutex);
......@@ -359,8 +361,6 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
if (code != 0) break;
}
*res = qall;
return code;
}
......
......@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
struct sockaddr_in serverAddr, clientAddr;
int ret;
pTrace("open tcp client socket:%s:%d", destIp, destPort);
// pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp);
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
......@@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) {
pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
//pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册