提交 97424093 编写于 作者: H hzcheng

Merge branch '2.0' into feature/2.0tsdb

...@@ -18,8 +18,6 @@ tests/test/ ...@@ -18,8 +18,6 @@ tests/test/
tests/taoshebei/ tests/taoshebei/
tests/taoscsv/ tests/taoscsv/
tests/taosdalipu/ tests/taosdalipu/
tests/sim/
tests/script/
tests/pytest/ tests/pytest/
tests/jenkins/ tests/jenkins/
tests/hdfs/ tests/hdfs/
...@@ -33,4 +31,6 @@ taoshebei/ ...@@ -33,4 +31,6 @@ taoshebei/
taosdalipu/ taosdalipu/
Target/ Target/
*.failed *.failed
*.sql *.sql
\ No newline at end of file sim/
...@@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); ...@@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
void rpcSendResponse(SRpcMsg *pMsg); void rpcSendResponse(SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -252,7 +252,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon ...@@ -252,7 +252,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time"); strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
......
...@@ -83,6 +83,9 @@ typedef struct { ...@@ -83,6 +83,9 @@ typedef struct {
int8_t oldInUse; // server IP inUse passed by app int8_t oldInUse; // server IP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API
SRpcIpSet *pSet; // for synchronous API
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
...@@ -183,6 +186,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); ...@@ -183,6 +186,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
...@@ -415,12 +419,12 @@ void rpcSendResponse(SRpcMsg *pMsg) { ...@@ -415,12 +419,12 @@ void rpcSendResponse(SRpcMsg *pMsg) {
rpcFreeMsg(pConn->pRspMsg); rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg; pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen; pConn->rspMsgLen = msgLen;
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
pConn->secured = 1; // connection shall be secured pConn->secured = 1; // connection shall be secured
...@@ -456,6 +460,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -456,6 +460,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return 0; return 0;
} }
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext;
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg));
tsem_t sem;
tsem_init(&sem, 0, 0);
pContext->pSem = &sem;
pContext->pRsp = pRsp;
pContext->pSet = pIpSet;
rpcSendRequest(shandle, pIpSet, pMsg);
tsem_wait(&sem);
tsem_destroy(&sem);
return;
}
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
if ( msg ) { if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
...@@ -661,8 +685,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -661,8 +685,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->inTranId == pHead->tranId) { if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) { if (pConn->inType == pHead->msgType) {
tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); if (pHead->code == 0) {
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else {
// do nothing, it is heart beat from client
}
} else if (pConn->inType == 0) { } else if (pConn->inType == 0) {
tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHead->msgType], pConn->inTranId); taosMsg[pHead->msgType], pConn->inTranId);
...@@ -703,22 +731,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -703,22 +731,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_INVALID_RESPONSE_TYPE; return TSDB_CODE_INVALID_RESPONSE_TYPE;
} }
if (*pHead->content == TSDB_CODE_NOT_READY) { if (pHead->code == TSDB_CODE_NOT_READY) {
return TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} }
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
pConn->retry = 0; pConn->retry = 0;
if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) { if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) {
if (pConn->tretry <= tsRpcMaxRetry) { if (pConn->tretry <= tsRpcMaxRetry) {
pConn->tretry++;
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); pConn->tretry++;
rpcSendReqHead(pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} else { } else {
// peer still in processing, give up // peer still in processing, give up
*pHead->content = TSDB_CODE_TOO_SLOW; return TSDB_CODE_TOO_SLOW;
} }
} }
...@@ -779,6 +808,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -779,6 +808,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
terrno = rpcProcessReqHead(pConn, pHead); terrno = rpcProcessReqHead(pConn, pHead);
pConn->connType = pRecv->connType; pConn->connType = pRecv->connType;
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
} }
...@@ -800,6 +830,18 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -800,6 +830,18 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
pContext->code = TSDB_CODE_NETWORK_UNAVAIL; pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
} }
if (pConn->inType) {
// if there are pending request, notify the app
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
(*(pRpc->cfp))(&rpcMsg);
}
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
...@@ -824,7 +866,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -824,7 +866,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pConn = rpcProcessMsgHead(pRpc, pRecv); pConn = rpcProcessMsgHead(pRpc, pRecv);
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
} }
...@@ -845,6 +887,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -845,6 +887,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
return pConn; return pConn;
} }
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc;
if (pContext->pRsp) {
// for synchronous API
tsem_post(pContext->pSem);
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
} else {
// for asynchronous API
if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect))
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pMsg);
}
// free the request message
rpcFreeCont(pContext->pCont);
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
...@@ -877,10 +939,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -877,10 +939,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} else { } else {
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) rpcNotifyClient(pContext, &rpcMsg);
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg
} }
} }
} }
...@@ -894,7 +953,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -894,7 +953,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead = (SRpcHead *)msg; pHead = (SRpcHead *)msg;
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType+1;
pHead->spi = 0; pHead->spi = pConn->spi;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
...@@ -903,7 +962,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -903,7 +962,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, 0); rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
pConn->secured = 1; // connection shall be secured
}
static void rpcSendReqHead(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgType = pConn->outType;
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = pConn->outTranId;
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = 1;
rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
} }
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
...@@ -999,9 +1080,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -999,9 +1080,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
...@@ -1027,8 +1108,8 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1027,8 +1108,8 @@ static void rpcProcessConnError(void *param, void *id) {
rpcMsg.code = pContext->code; rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
rpcMsg.contLen = 0; rpcMsg.contLen = 0;
(*(pRpc->cfp))(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg rpcNotifyClient(pContext, &rpcMsg);
} else { } else {
// move to next IP // move to next IP
pContext->ipSet.inUse++; pContext->ipSet.inUse++;
...@@ -1079,6 +1160,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -1079,6 +1160,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
if (pConn->user[0]) { if (pConn->user[0]) {
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
if (pConn->inType && pRpc->cfp) {
// if there are pending request, notify the app
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
(*(pRpc->cfp))(&rpcMsg);
}
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
......
...@@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_EXECUTABLE(rclient ${CLIENT_SRC}) ADD_EXECUTABLE(rclient ${CLIENT_SRC})
TARGET_LINK_LIBRARIES(rclient trpc) TARGET_LINK_LIBRARIES(rclient trpc)
LIST(APPEND SCLIENT_SRC ./rsclient.c)
ADD_EXECUTABLE(rsclient ${SCLIENT_SRC})
TARGET_LINK_LIBRARIES(rsclient trpc)
LIST(APPEND SERVER_SRC ./rserver.c) LIST(APPEND SERVER_SRC ./rserver.c)
ADD_EXECUTABLE(rserver ${SERVER_SRC}) ADD_EXECUTABLE(rserver ${SERVER_SRC})
TARGET_LINK_LIBRARIES(rserver trpc) TARGET_LINK_LIBRARIES(rserver trpc)
......
...@@ -40,7 +40,7 @@ typedef struct { ...@@ -40,7 +40,7 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
void processResponse(SRpcMsg *pMsg) { static void processResponse(SRpcMsg *pMsg) {
SInfo *pInfo = (SInfo *)pMsg->handle; SInfo *pInfo = (SInfo *)pMsg->handle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
...@@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) { ...@@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) {
sem_post(&pInfo->rspSem); sem_post(&pInfo->rspSem);
} }
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle; SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet; pInfo->ipSet = *pIpSet;
} }
int tcount = 0; static int tcount = 0;
void *sendRequest(void *param) { static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#include <semaphore.h>
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include "taoserror.h"
#include <stdint.h>
#include <unistd.h>
typedef struct {
int index;
SRpcIpSet ipSet;
int num;
int numOfReqs;
int msgSize;
sem_t rspSem;
sem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
static int tcount = 0;
static int terror = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg;
tTrace("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.handle = pInfo;
rpcMsg.msgType = 1;
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg);
// handle response
if (rspMsg.code != 0) terror++;
tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
rpcFreeCont(rspMsg.pCont);
if ( pInfo->num % 20000 == 0 )
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
tTrace("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcIpSet ipSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
ipSet.numOfIps = 1;
ipSet.inUse = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr(serverIp);
ipSet.ip[1] = inet_addr("192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
// rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = "mypassword";
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
dError("failed to initialize RPC");
return -1;
}
tPrint("client is initialized");
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->ipSet = ipSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLogger();
return 0;
}
...@@ -72,7 +72,7 @@ void processShellMsg() { ...@@ -72,7 +72,7 @@ void processShellMsg() {
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle; rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 1; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg); taosFreeQitem(pRpcMsg);
...@@ -126,9 +126,10 @@ void processRequestMsg(SRpcMsg *pMsg) { ...@@ -126,9 +126,10 @@ void processRequestMsg(SRpcMsg *pMsg) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
char dataName[20] = "server.data"; char dataName[20] = "server.data";
char localIp[40] = "0.0.0.0";
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0"; rpcInit.localIp = localIp;
rpcInit.localPort = 7000; rpcInit.localPort = 7000;
rpcInit.label = "SER"; rpcInit.label = "SER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
...@@ -201,5 +202,3 @@ int main(int argc, char *argv[]) { ...@@ -201,5 +202,3 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
ADD_LIBRARY(wal ${SRC}) ADD_LIBRARY(twal ${SRC})
TARGET_INCLUDE_DIRECTORIES(wal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) TARGET_LINK_LIBRARIES(twal tutil)
\ No newline at end of file
ADD_SUBDIRECTORY(test)
...@@ -14,19 +14,37 @@ ...@@ -14,19 +14,37 @@
*/ */
#ifndef _TD_WAL_H_ #ifndef _TD_WAL_H_
#define _TD_WAL_H_ #define _TD_WAL_H_
#include <stdint.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef void walh; // WAL HANDLE #define TAOS_WAL_NOLOG 0
#define TAOS_WAL_WRITE 1
#define TAOS_WAL_FSYNC 2
typedef struct {
int8_t msgType;
int8_t reserved[3];
int32_t len;
uint64_t version;
uint32_t signature;
uint32_t cksum;
char cont[];
} SWalHead;
typedef void* twal_h; // WAL HANDLE
twal_h walOpen(char *path, int max, int level);
void walClose(twal_h);
int walRenew(twal_h);
int walWrite(twal_h, SWalHead *);
void walFsync(twal_h);
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead));
int walGetWalFile(twal_h, char *name, uint32_t *index);
extern int wDebugFlag;
walh *vnodeOpenWal(int vnode, uint8_t op);
int vnodeCloseWal(walh *pWal);
int vnodeRenewWal(walh *pWal);
int vnodeWriteWal(walh *pWal, void *cont, int contLen);
int vnodeSyncWal(walh *pWal);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <dirent.h>
#include <unistd.h>
#include <fcntl.h>
#include "os.h"
#include "tlog.h"
#include "tchecksum.h"
#include "tutil.h"
#include "twal.h"
#define walPrefix "wal"
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);}
#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);}
#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);}
typedef struct {
int fd;
int level;
int max; // maximum number of wal files
uint32_t id; // increase continuously
int num; // number of wal files
char path[TSDB_FILENAME_LEN];
char name[TSDB_FILENAME_LEN];
pthread_mutex_t mutex;
} SWal;
int wDebugFlag = 135;
static uint32_t walSignature = 0xFAFBFDFE;
static int walHandleExistingFiles(char *path);
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *));
static int walRemoveWalFiles(char *path);
void *walOpen(char *path, int max, int level) {
SWal *pWal = calloc(sizeof(SWal), 1);
if (pWal == NULL) return NULL;
pWal->fd = -1;
pWal->max = max;
pWal->id = 0;
pWal->num = 0;
pWal->level = level;
strcpy(pWal->path, path);
pthread_mutex_init(&pWal->mutex, NULL);
if (access(path, F_OK) != 0) mkdir(path, 0755);
if (walHandleExistingFiles(path) == 0)
walRenew(pWal);
if (pWal->fd <0) {
wError("wal:%s, failed to open", path);
pthread_mutex_destroy(&pWal->mutex);
free(pWal);
pWal = NULL;
}
return pWal;
}
void walClose(void *handle) {
SWal *pWal = (SWal *)handle;
close(pWal->fd);
// remove all files in the directory
for (int i=0; i<pWal->num; ++i) {
sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id-i);
if (remove(pWal->name) <0) {
wError("wal:%s, failed to remove", pWal->name);
} else {
wTrace("wal:%s, it is removed", pWal->name);
}
}
pthread_mutex_destroy(&pWal->mutex);
free(pWal);
}
int walRenew(twal_h handle) {
SWal *pWal = (SWal *)handle;
int code = 0;
pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >=0) {
close(pWal->fd);
pWal->id++;
wTrace("wal:%s, it is closed", pWal->name);
}
pWal->num++;
sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
code = -1;
} else {
wTrace("wal:%s, it is created", pWal->name);
if (pWal->num > pWal->max) {
// remove the oldest wal file
char name[TSDB_FILENAME_LEN];
sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
} else {
wTrace("wal:%s, it is removed", name);
}
pWal->num--;
}
}
pthread_mutex_unlock(&pWal->mutex);
return code;
}
int walWrite(void *handle, SWalHead *pHead) {
SWal *pWal = (SWal *)handle;
int code = 0;
// no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0;
pHead->signature = walSignature;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal));
int contLen = pHead->len + sizeof(SWalHead);
if(write(pWal->fd, pHead, contLen) != contLen) {
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
code = -1;
}
return code;
}
void walFsync(void *handle) {
SWal *pWal = (SWal *)handle;
if (pWal->level == TAOS_WAL_FSYNC)
fsync(pWal->fd);
}
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
SWal *pWal = (SWal *)handle;
int code = 0;
struct dirent *ent;
int count = 0;
uint32_t maxId = 0, minId = -1, index =0;
int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN];
sprintf(opath, "%s/old", pWal->path);
// is there old directory?
if (access(opath, F_OK)) return 0;
DIR *dir = opendir(opath);
while ((ent = readdir(dir))!= NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
index = atol(ent->d_name + plen);
if (index > maxId) maxId = index;
if (index < minId) minId = index;
count++;
}
}
if ( count != (maxId-minId+1) ) {
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
code = -1;
} else {
wTrace("wal:%s, %d files will be restored", opath, count);
for (index = minId; index<=maxId; ++index) {
sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index);
code = walRestoreWalFile(pWal->name, pVnode, writeFp);
if (code < 0) break;
}
}
if (code == 0) {
code = walRemoveWalFiles(opath);
if (code == 0) {
if (remove(opath) < 0) {
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
code = -1;
}
}
}
closedir(dir);
return code;
}
int walGetWalFile(void *handle, char *name, uint32_t *index) {
SWal *pWal = (SWal *)handle;
int code = 1;
int32_t first = 0;
name[0] = 0;
if (pWal == NULL || pWal->num == 0) return 0;
pthread_mutex_lock(&(pWal->mutex));
first = pWal->id + 1 - pWal->num;
if (*index == 0) *index = first; // set to first one
if (*index < first && *index > pWal->id) {
code = -1; // index out of range
} else {
sprintf(name, "%s/%s%d", pWal->path, walPrefix, *index);
code = (*index == pWal->id) ? 0:1;
}
pthread_mutex_unlock(&(pWal->mutex));
return code;
}
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) {
SWalHead walHead;
int code = 0;
int fd = open(name, O_RDONLY);
if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
return -1;
}
wTrace("wal:%s, start to restore", name);
while (1) {
int ret = read(fd, &walHead, sizeof(walHead));
if ( ret == 0) { code = 0; break;}
if (ret != sizeof(walHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
break;
}
if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
break;
}
char *buffer = malloc(sizeof(SWalHead) + walHead.len);
memcpy(buffer, &walHead, sizeof(walHead));
ret = read(fd, buffer+sizeof(walHead), walHead.len);
if ( ret != walHead.len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret);
break;
}
// write into queue
(*writeFp)(pVnode, buffer);
}
return code;
}
int walHandleExistingFiles(char *path) {
int code = 0;
char oname[TSDB_FILENAME_LEN];
char nname[TSDB_FILENAME_LEN];
char opath[TSDB_FILENAME_LEN];
sprintf(opath, "%s/old", path);
struct dirent *ent;
DIR *dir = opendir(path);
int plen = strlen(walPrefix);
if (access(opath, F_OK) == 0) {
// old directory is there, it means restore process is not finished
walRemoveWalFiles(path);
} else {
// move all files to old directory
int count = 0;
while ((ent = readdir(dir))!= NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
if (access(opath, F_OK) != 0) mkdir(opath, 0755);
sprintf(oname, "%s/%s", path, ent->d_name);
sprintf(nname, "%s/old/%s", path, ent->d_name);
if (rename(oname, nname) < 0) {
wError("wal:%s, failed to move to new:%s", oname, nname);
code = -1;
break;
}
count++;
}
}
wTrace("wal:%s, %d files are moved for restoration", path, count);
}
closedir(dir);
return code;
}
static int walRemoveWalFiles(char *path) {
int plen = strlen(walPrefix);
char name[TSDB_FILENAME_LEN];
int code = 0;
if (access(path, F_OK) != 0) return 0;
struct dirent *ent;
DIR *dir = opendir(path);
while ((ent = readdir(dir))!= NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
sprintf(name, "%s/%s", path, ent->d_name);
if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
code = -1; break;
}
}
}
closedir(dir);
return code;
}
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(../inc)
LIST(APPEND WALTEST_SRC ./waltest.c)
ADD_EXECUTABLE(waltest ${WALTEST_SRC})
TARGET_LINK_LIBRARIES(waltest twal)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include <stdint.h>
#include "tlog.h"
#include "twal.h"
int64_t ver = 0;
void *pWal = NULL;
int writeToQueue(void *pVnode, void *data) {
SWalHead *pHead = (SWalHead *)data;
// do nothing
if (pHead->version > ver)
ver = pHead->version;
walWrite(pWal, pHead);
free(data);
return 0;
}
int main(int argc, char *argv[]) {
char path[128] = "/home/jhtao/test/wal";
int max = 3;
int level = 2;
int total = 5;
int rows = 10000;
int size = 128;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
strcpy(path, argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
max = atoi(argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
level = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
rows = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
total = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
size = atoi(argv[++i]);
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
ver = atoll(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
ddebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-p path]: wal file path default is:%s\n", path);
printf(" [-m max]: max wal files, default is:%d\n", max);
printf(" [-l level]: log level, default is:%d\n", level);
printf(" [-t total]: total wal files, default is:%d\n", total);
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
printf(" [-v version]: initial version, default is:%ld\n", ver);
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("wal.log", 100000, 10);
pWal = walOpen(path, max, level);
if (pWal == NULL) {
printf("failed to open wal\n");
exit(-1);
}
int ret = walRestore(pWal, NULL, writeToQueue);
if (ret <0) {
printf("failed to restore wal\n");
exit(-1);
}
printf("version starts from:%ld\n", ver);
int contLen = sizeof(SWalHead) + size;
SWalHead *pHead = (SWalHead *) malloc(contLen);
for (int i=0; i<total; ++i) {
for (int k=0; k<rows; ++k) {
pHead->version = ++ver;
pHead->len = size;
walWrite(pWal, pHead);
}
printf("renew a wal, i:%d\n", i);
walRenew(pWal);
}
printf("%d wal files are written\n", total);
uint32_t index = 0;
char name[256];
while (1) {
int code = walGetWalFile(pWal, name, &index);
if (code == -1) {
printf("failed to get wal file, index:%d\n", index);
break;
}
printf("index:%d wal:%s\n", index, name);
if (code == 0) break;
index++;
}
getchar();
walClose(pWal);
return 0;
}
...@@ -10,3 +10,4 @@ SET(CMAKE_C_STANDARD 11) ...@@ -10,3 +10,4 @@ SET(CMAKE_C_STANDARD 11)
SET(CMAKE_VERBOSE_MAKEFILE ON) SET(CMAKE_VERBOSE_MAKEFILE ON)
ADD_SUBDIRECTORY(examples/c) ADD_SUBDIRECTORY(examples/c)
ADD_SUBDIRECTORY(tsim)
#################################
run general/user/testSuite.sim
##################################
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== show users
sql show users
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
print =============== create user1
sql create user user1 PASS 'user1'
sql show users
if $rows != 4 then
return -1
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
print =============== create user2
sql create user user2 PASS 'user2'
sql show users
if $rows != 5 then
return -1
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
print =============== drop user1
sql drop user user1
sql show users
if $rows != 4 then
return -1
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
print =============== restart taosd
system sh/exec.sh -n dnode1 -s stop
sleep 1000
system sh/exec.sh -n dnode1 -s start
print =============== show users
sql show users
if $rows != 4 then
return -1
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
#################################
run general/user/basic.sim
##################################
#!/bin/sh
if [ $# != 6 ]; then
echo "argument list need input : "
echo " -n nodeName"
echo " -c configName"
echo " -v configValue"
exit 1
fi
NODE_NAME=
CONFIG_NAME=
CONFIG_VALUE=
while getopts "n:v:c:" arg
do
case $arg in
n)
NODE_NAME=$OPTARG
;;
c)
CONFIG_NAME=$OPTARG
;;
v)
CONFIG_VALUE=$OPTARG
;;
?)
echo "unkonw argument"
;;
esac
done
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
SCRIPT_DIR=`pwd`
cd ../../
TAOS_DIR=`pwd`
BUILD_DIR=$TAOS_DIR/debug/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME
TAOS_CFG=$NODE_DIR/cfg/taos.cfg
TAOS_FLAG=$SIM_DIR/tsim/flag
if [ -f "$TAOS_FLAG" ] ; then
TAOS_CFG=/etc/taos/taos.cfg
fi
echo "$CONFIG_NAME $CONFIG_VALUE" >> $TAOS_CFG
#!/bin/sh
echo "Executing clear.sh"
if [ $# != 6 ]; then
echo "argument list need input : "
echo " -n nodeName"
echo " -i nodeIp"
echo " -m masterIp"
exit 1
fi
NODE_NAME=
NODE_IP=
MSATER_IP=
while getopts "n:i:m:" arg
do
case $arg in
n)
NODE_NAME=$OPTARG
;;
i)
NODE_IP=$OPTARG
;;
m)
MASTER_IP=$OPTARG
;;
?)
echo "unkonw argument"
;;
esac
done
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
SCRIPT_DIR=`pwd`
echo "SCRIPT_DIR: $SCRIPT_DIR"
cd ../../
TAOS_DIR=`pwd`
BUILD_DIR=$TAOS_DIR/debug/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME
EXE_DIR=$BUILD_DIR/bin
CFG_DIR=$NODE_DIR/cfg
LOG_DIR=$NODE_DIR/log
DATA_DIR=$NODE_DIR/data
#echo ============ deploy $NODE_NAME
#echo === masterIp : $MASTER_IP
#echo === nodeIp : $NODE_IP
#echo === nodePath : $EXE_DIR
#echo === cfgPath : $CFG_DIR
#echo === logPath : $LOG_DIR
#echo === dataPath : $DATA_DIR
# rm -rf $NODE_DIR
mkdir -p $SIM_DIR
mkdir -p $NODE_DIR
mkdir -p $LOG_DIR
rm -rf $DATA_DIR
mkdir -p $DATA_DIR
#cp -rf $TAOS_DIR/cfg $NODE_DIR/
rm -rf $CFG_DIR
mkdir -p $CFG_DIR
#allow normal user to read/write log
chmod -R 777 $NODE_DIR
TAOS_CFG=$NODE_DIR/cfg/taos.cfg
touch -f $TAOS_CFG
TAOS_FLAG=$SIM_DIR/tsim/flag
if [ -f "$TAOS_FLAG" ] ; then
TAOS_CFG=/etc/taos/taos.cfg
DATA_DIR=/var/lib/taos
LOG_DIR=/var/log/taos
sudo rm -f /etc/taos/*.cfg
sudo cp -rf $TAOS_DIR/cfg/*.cfg /etc/taos
sudo rm -rf $DATA_DIR
sudo rm -rf $LOG_DIR
fi
echo " " >> $TAOS_CFG
echo "masterIp $MASTER_IP" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "publicIp $NODE_IP" >> $TAOS_CFG
echo "internalIp $NODE_IP" >> $TAOS_CFG
echo "privateIp $NODE_IP" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 131" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 131" >> $TAOS_CFG
echo "monitorDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 131" >> $TAOS_CFG
echo "jnidebugFlag 131" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG
echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
echo "defaultPass taosdata" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "mgmtEqualVnodeNum 0" >> $TAOS_CFG
echo "clog 0" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG
echo "numOfTotalVnodes 4" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "numOfMPeers 1" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
#!/bin/sh
echo "Executing deploy.sh"
if [ $# != 6 ]; then
echo "argument list need input : "
echo " -n nodeName"
echo " -i nodeIp"
echo " -m masterIp"
exit 1
fi
NODE_NAME=
NODE_IP=
MSATER_IP=
while getopts "n:i:m:" arg
do
case $arg in
n)
NODE_NAME=$OPTARG
;;
i)
NODE_IP=$OPTARG
;;
m)
MASTER_IP=$OPTARG
;;
?)
echo "unkonw argument"
;;
esac
done
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
SCRIPT_DIR=`pwd`
echo "SCRIPT_DIR: $SCRIPT_DIR"
cd ../../
TAOS_DIR=`pwd`
BUILD_DIR=$TAOS_DIR/debug/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME
EXE_DIR=$BUILD_DIR/bin
CFG_DIR=$NODE_DIR/cfg
LOG_DIR=$NODE_DIR/log
DATA_DIR=$NODE_DIR/data
#echo ============ deploy $NODE_NAME
#echo === masterIp : $MASTER_IP
#echo === nodeIp : $NODE_IP
#echo === nodePath : $EXE_DIR
#echo === cfgPath : $CFG_DIR
#echo === logPath : $LOG_DIR
#echo === dataPath : $DATA_DIR
rm -rf $NODE_DIR
mkdir -p $SIM_DIR
mkdir -p $NODE_DIR
mkdir -p $LOG_DIR
mkdir -p $DATA_DIR
#cp -rf $TAOS_DIR/cfg $NODE_DIR/
mkdir -p $CFG_DIR
#allow normal user to read/write log
chmod -R 777 $NODE_DIR
TAOS_CFG=$NODE_DIR/cfg/taos.cfg
touch -f $TAOS_CFG
TAOS_FLAG=$SIM_DIR/tsim/flag
if [ -f "$TAOS_FLAG" ] ; then
TAOS_CFG=/etc/taos/taos.cfg
DATA_DIR=/var/lib/taos
LOG_DIR=/var/log/taos
sudo rm -f /etc/taos/*.cfg
sudo cp -rf $TAOS_DIR/cfg/*.cfg /etc/taos
sudo rm -rf $DATA_DIR
sudo rm -rf $LOG_DIR
fi
echo " " >> $TAOS_CFG
echo "masterIp $MASTER_IP" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "publicIp $NODE_IP" >> $TAOS_CFG
echo "internalIp $NODE_IP" >> $TAOS_CFG
echo "privateIp $NODE_IP" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 131" >> $TAOS_CFG
echo "monitorDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 131" >> $TAOS_CFG
echo "jnidebugFlag 131" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG
echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
echo "defaultPass taosdata" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "mgmtEqualVnodeNum 0" >> $TAOS_CFG
echo "clog 0" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG
echo "numOfTotalVnodes 4" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "numOfMPeers 1" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
echo "anyIp 0" >> $TAOS_CFG
#!/bin/sh
# if [ $# != 4 || $# != 5 ]; then
# echo "argument list need input : "
# echo " -n nodeName"
# echo " -s start/stop"
# echo " -c clear"
# exit 1
# fi
NODE_NAME=
EXEC_OPTON=
CLEAR_OPTION="false"
while getopts "n:s:u:x:ct" arg
do
case $arg in
n)
NODE_NAME=$OPTARG
;;
s)
EXEC_OPTON=$OPTARG
;;
c)
CLEAR_OPTION="clear"
;;
t)
SHELL_OPTION="true"
;;
u)
USERS=$OPTARG
;;
x)
SIGNAL=$OPTARG
;;
?)
echo "unkown argument"
;;
esac
done
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
SCRIPT_DIR=`pwd`
cd ../../
TAOS_DIR=`pwd`
BUILD_DIR=$TAOS_DIR/debug/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME
EXE_DIR=$BUILD_DIR/bin
CFG_DIR=$NODE_DIR/cfg
LOG_DIR=$NODE_DIR/log
DATA_DIR=$NODE_DIR/data
MGMT_DIR=$NODE_DIR/data/mgmt
TSDB_DIR=$NODE_DIR/data/tsdb
TAOS_CFG=$NODE_DIR/cfg/taos.cfg
echo ------------ $EXEC_OPTON $NODE_NAME
TAOS_FLAG=$SIM_DIR/tsim/flag
if [ -f "$TAOS_FLAG" ]; then
EXE_DIR=/usr/local/bin/taos
fi
if [ "$CLEAR_OPTION" = "clear" ]; then
echo rm -rf $MGMT_DIR $TSDB_DIR
rm -rf $TSDB_DIR
rm -rf $MGMT_DIR
fi
if [ "$SHELL_OPTION" = "true" ]; then
if [ "$EXEC_OPTON" = "start" ]; then
echo "ExcuteCmd:" $EXE_DIR/taos -c $CFG_DIR -u $USERS -p
$EXE_DIR/taos -c $CFG_DIR -u $USERS -p
else
#relative path
RCFG_DIR=sim/$NODE_NAME/cfg
PID=`ps -ef|grep -v taosd | grep taos | grep $RCFG_DIR | grep -v grep | awk '{print $2}'`
if [ -n "$PID" ]; then
sudo kill -9 $PID
fi
fi
return
fi
if [ "$EXEC_OPTON" = "start" ]; then
echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR
nohup $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 &
#TT=`date +%s`
#mkdir ${LOG_DIR}/${TT}
#echo valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR
#nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 &
else
#relative path
RCFG_DIR=sim/$NODE_NAME/cfg
PID=`ps -ef|grep taosd | grep $RCFG_DIR | grep -v grep | awk '{print $2}'`
if [ -n "$PID" ]; then
if [ "$SIGNAL" = "SIGINT" ]; then
echo killed by signal
sudo kill -sigint $PID
else
sudo kill -9 $PID
fi
fi
fi
#!/bin/sh
#already create real card, such as 192.168.0.1-5
#exit 0
if [ $# != 4 ]; then
echo "argument list need input : "
echo " -i if use [192.168.0.70 ] then input [70]"
echo " -s up/down"
exit 1
fi
#just create ip like 192.168.0.*
IP_ADDRESS=
EXEC_OPTON=
while getopts "i:s:" arg
do
case $arg in
i)
IP_ADDRESS=$OPTARG
;;
s)
EXEC_OPTON=$OPTARG
;;
?)
echo "unkonw argument"
;;
esac
done
echo ============ $EXEC_OPTON $IP_ADDRESS ===========
sudo ifconfig lo:$IP_ADDRESS 192.168.0.$IP_ADDRESS $EXEC_OPTON
#!/bin/sh
PID=`ps -ef|grep /usr/bin/taosd | grep -v grep | awk '{print $2}'`
if [ -n "$PID" ]; then
echo sudo systemctl stop taosd
sudo systemctl stop taosd
fi
PID=`ps -ef|grep taosd | grep -v grep | awk '{print $2}'`
if [ -n "$PID" ]; then
echo sudo kill -9 $PID
sudo kill -9 $PID
fi
#!/bin/bash
##################################################
#
# Do simulation test
#
##################################################
set +e
FILE_NAME=
RELEASE=0
ASYNC=0
while getopts "f:a" arg
do
case $arg in
f)
FILE_NAME=$OPTARG
;;
a)
ASYNC=1
;;
?)
echo "unknow argument"
;;
esac
done
cd .
sh/ip.sh -i 1 -s up > /dev/null 2>&1 &
sh/ip.sh -i 2 -s up > /dev/null 2>&1 &
sh/ip.sh -i 3 -s up > /dev/null 2>&1 &
# Get responsible directories
CODE_DIR=`dirname $0`
CODE_DIR=`pwd`
cd ../../
TOP_DIR=`pwd`
BUILD_DIR=$TOP_DIR/debug/build
SIM_DIR=$TOP_DIR/sim
if [ $ASYNC -eq 0 ]; then
PROGRAM=$BUILD_DIR/bin/tsim
else
PROGRAM="$BUILD_DIR/bin/tsim -a"
fi
PRG_DIR=$SIM_DIR/tsim
CFG_DIR=$PRG_DIR/cfg
LOG_DIR=$PRG_DIR/log
DATA_DIR=$PRG_DIR/data
chmod -R 777 $PRG_DIR
echo "------------------------------------------------------------------------"
echo "Start TDengine Testing Case ..."
echo "BUILD_DIR: $BUILD_DIR"
echo "SIM_DIR : $SIM_DIR"
echo "CODE_DIR : $CODE_DIR"
echo "CFG_DIR : $CFG_DIR"
rm -rf $LOG_DIR
rm -rf $CFG_DIR
mkdir -p $PRG_DIR
mkdir -p $LOG_DIR
mkdir -p $CFG_DIR
TAOS_CFG=$PRG_DIR/cfg/taos.cfg
touch -f $TAOS_CFG
TAOS_FLAG=$PRG_DIR/flag
echo " " >> $TAOS_CFG
echo "scriptDir ${CODE_DIR}/../script">> $TAOS_CFG
echo "masterIp 192.168.0.1" >> $TAOS_CFG
echo "secondIp 192.168.0.2" >> $TAOS_CFG
echo "localIp 127.0.0.1" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "clog 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
echo " " >> $TAOS_CFG
ulimit -n 600000
ulimit -c unlimited
#sudo sysctl -w kernel.core_pattern=$TOP_DIR/core.%p.%e
if [ -n "$FILE_NAME" ]; then
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME
echo "------------------------------------------------------------------------"
#valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME
$PROGRAM -c $CFG_DIR -f $FILE_NAME
else
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f basicSuite.sim
echo "------------------------------------------------------------------------"
$PROGRAM -c $CFG_DIR -f basicSuite.sim
fi
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/exec.sh -n dnode1 -s start
sql connect
\ No newline at end of file
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_WINDOWS_64)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
ENDIF ()
AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(tsim ${SRC})
TARGET_LINK_LIBRARIES(tsim taos_static trpc tutil pthread )
/*
Copyright (c) 2009-2017 Dave Gamble and cJSON contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef cJSON__h
#define cJSON__h
#ifdef __cplusplus
extern "C"
{
#endif
/* project version */
#define CJSON_VERSION_MAJOR 1
#define CJSON_VERSION_MINOR 5
#define CJSON_VERSION_PATCH 9
#include <stddef.h>
#include <stdint.h>
/* cJSON Types: */
#define cJSON_Invalid (0)
#define cJSON_False (1 << 0)
#define cJSON_True (1 << 1)
#define cJSON_NULL (1 << 2)
#define cJSON_Number (1 << 3)
#define cJSON_String (1 << 4)
#define cJSON_Array (1 << 5)
#define cJSON_Object (1 << 6)
#define cJSON_Raw (1 << 7) /* raw json */
#define cJSON_IsReference 256
#define cJSON_StringIsConst 512
/* The cJSON structure: */
typedef struct cJSON
{
/* next/prev allow you to walk array/object chains. Alternatively, use GetArraySize/GetArrayItem/GetObjectItem */
struct cJSON *next;
struct cJSON *prev;
/* An array or object item will have a child pointer pointing to a chain of the items in the array/object. */
struct cJSON *child;
/* The type of the item, as above. */
int type;
/* The item's string, if type==cJSON_String and type == cJSON_Raw */
char *valuestring;
/* writing to valueint is DEPRECATED, use cJSON_SetNumberValue instead */
int64_t valueint;
/* The item's number, if type==cJSON_Number */
double valuedouble;
/* The item's name string, if this item is the child of, or is in the list of subitems of an object. */
char *string;
//Keep the original string of number
char numberstring[13];
} cJSON;
typedef struct cJSON_Hooks
{
void *(*malloc_fn)(size_t sz);
void (*free_fn)(void *ptr);
} cJSON_Hooks;
typedef int cJSON_bool;
#if !defined(__WINDOWS__) && (defined(WIN32) || defined(WIN64) || defined(_MSC_VER) || defined(_WIN32))
#define __WINDOWS__
#endif
#ifdef __WINDOWS__
/* When compiling for windows, we specify a specific calling convention to avoid issues where we are being called from a project with a different default calling convention. For windows you have 2 define options:
CJSON_HIDE_SYMBOLS - Define this in the case where you don't want to ever dllexport symbols
CJSON_EXPORT_SYMBOLS - Define this on library build when you want to dllexport symbols (default)
CJSON_IMPORT_SYMBOLS - Define this if you want to dllimport symbol
For *nix builds that support visibility attribute, you can define similar behavior by
setting default visibility to hidden by adding
-fvisibility=hidden (for gcc)
or
-xldscope=hidden (for sun cc)
to CFLAGS
then using the CJSON_API_VISIBILITY flag to "export" the same symbols the way CJSON_EXPORT_SYMBOLS does
*/
/* export symbols by default, this is necessary for copy pasting the C and header file */
#if !defined(CJSON_HIDE_SYMBOLS) && !defined(CJSON_IMPORT_SYMBOLS) && !defined(CJSON_EXPORT_SYMBOLS)
#define CJSON_EXPORT_SYMBOLS
#endif
#if defined(CJSON_HIDE_SYMBOLS)
#define CJSON_PUBLIC(type) type __stdcall
#elif defined(CJSON_EXPORT_SYMBOLS)
#define CJSON_PUBLIC(type) __declspec(dllexport) type __stdcall
#elif defined(CJSON_IMPORT_SYMBOLS)
#define CJSON_PUBLIC(type) __declspec(dllimport) type __stdcall
#endif
#else /* !WIN32 */
#if (defined(__GNUC__) || defined(__SUNPRO_CC) || defined (__SUNPRO_C)) && defined(CJSON_API_VISIBILITY)
#define CJSON_PUBLIC(type) __attribute__((visibility("default"))) type
#else
#define CJSON_PUBLIC(type) type
#endif
#endif
/* Limits how deeply nested arrays/objects can be before cJSON rejects to parse them.
* This is to prevent stack overflows. */
#ifndef CJSON_NESTING_LIMIT
#define CJSON_NESTING_LIMIT 1000
#endif
/* returns the version of cJSON as a string */
CJSON_PUBLIC(const char*) cJSON_Version(void);
/* Supply malloc, realloc and free functions to cJSON */
CJSON_PUBLIC(void) cJSON_InitHooks(cJSON_Hooks* hooks);
/* Memory Management: the caller is always responsible to free the results from all variants of cJSON_Parse (with cJSON_Delete) and cJSON_Print (with stdlib free, cJSON_Hooks.free_fn, or cJSON_free as appropriate). The exception is cJSON_PrintPreallocated, where the caller has full responsibility of the buffer. */
/* Supply a block of JSON, and this returns a cJSON object you can interrogate. */
CJSON_PUBLIC(cJSON *) cJSON_Parse(const char *value);
/* ParseWithOpts allows you to require (and check) that the JSON is null terminated, and to retrieve the pointer to the final byte parsed. */
/* If you supply a ptr in return_parse_end and parsing fails, then return_parse_end will contain a pointer to the error so will match cJSON_GetErrorPtr(). */
CJSON_PUBLIC(cJSON *) cJSON_ParseWithOpts(const char *value, const char **return_parse_end, cJSON_bool require_null_terminated);
/* Render a cJSON entity to text for transfer/storage. */
CJSON_PUBLIC(char *) cJSON_Print(const cJSON *item);
/* Render a cJSON entity to text for transfer/storage without any formatting. */
CJSON_PUBLIC(char *) cJSON_PrintUnformatted(const cJSON *item);
/* Render a cJSON entity to text using a buffered strategy. prebuffer is a guess at the final size. guessing well reduces reallocation. fmt=0 gives unformatted, =1 gives formatted */
CJSON_PUBLIC(char *) cJSON_PrintBuffered(const cJSON *item, int prebuffer, cJSON_bool fmt);
/* Render a cJSON entity to text using a buffer already allocated in memory with given length. Returns 1 on success and 0 on failure. */
/* NOTE: cJSON is not always 100% accurate in estimating how much memory it will use, so to be safe allocate 5 bytes more than you actually need */
CJSON_PUBLIC(cJSON_bool) cJSON_PrintPreallocated(cJSON *item, char *buffer, const int length, const cJSON_bool format);
/* Delete a cJSON entity and all subentities. */
CJSON_PUBLIC(void) cJSON_Delete(cJSON *c);
/* Returns the number of items in an array (or object). */
CJSON_PUBLIC(int) cJSON_GetArraySize(const cJSON *array);
/* Retrieve item number "item" from array "array". Returns NULL if unsuccessful. */
CJSON_PUBLIC(cJSON *) cJSON_GetArrayItem(const cJSON *array, int index);
/* Get item "string" from object. Case insensitive. */
CJSON_PUBLIC(cJSON *) cJSON_GetObjectItem(const cJSON * const object, const char * const string);
CJSON_PUBLIC(cJSON *) cJSON_GetObjectItemCaseSensitive(const cJSON * const object, const char * const string);
CJSON_PUBLIC(cJSON_bool) cJSON_HasObjectItem(const cJSON *object, const char *string);
/* For analysing failed parses. This returns a pointer to the parse error. You'll probably need to look a few chars back to make sense of it. Defined when cJSON_Parse() returns 0. 0 when cJSON_Parse() succeeds. */
CJSON_PUBLIC(const char *) cJSON_GetErrorPtr(void);
/* These functions check the type of an item */
CJSON_PUBLIC(cJSON_bool) cJSON_IsInvalid(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsFalse(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsTrue(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsBool(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsNull(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsNumber(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsString(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsArray(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsObject(const cJSON * const item);
CJSON_PUBLIC(cJSON_bool) cJSON_IsRaw(const cJSON * const item);
/* These calls create a cJSON item of the appropriate type. */
CJSON_PUBLIC(cJSON *) cJSON_CreateNull(void);
CJSON_PUBLIC(cJSON *) cJSON_CreateTrue(void);
CJSON_PUBLIC(cJSON *) cJSON_CreateFalse(void);
CJSON_PUBLIC(cJSON *) cJSON_CreateBool(cJSON_bool boolean);
CJSON_PUBLIC(cJSON *) cJSON_CreateNumber(double num);
CJSON_PUBLIC(cJSON *) cJSON_CreateString(const char *string);
/* raw json */
CJSON_PUBLIC(cJSON *) cJSON_CreateRaw(const char *raw);
CJSON_PUBLIC(cJSON *) cJSON_CreateArray(void);
CJSON_PUBLIC(cJSON *) cJSON_CreateObject(void);
/* These utilities create an Array of count items. */
CJSON_PUBLIC(cJSON *) cJSON_CreateIntArray(const int *numbers, int count);
CJSON_PUBLIC(cJSON *) cJSON_CreateFloatArray(const float *numbers, int count);
CJSON_PUBLIC(cJSON *) cJSON_CreateDoubleArray(const double *numbers, int count);
CJSON_PUBLIC(cJSON *) cJSON_CreateStringArray(const char **strings, int count);
/* Append item to the specified array/object. */
CJSON_PUBLIC(void) cJSON_AddItemToArray(cJSON *array, cJSON *item);
CJSON_PUBLIC(void) cJSON_AddItemToObject(cJSON *object, const char *string, cJSON *item);
/* Use this when string is definitely const (i.e. a literal, or as good as), and will definitely survive the cJSON object.
* WARNING: When this function was used, make sure to always check that (item->type & cJSON_StringIsConst) is zero before
* writing to `item->string` */
CJSON_PUBLIC(void) cJSON_AddItemToObjectCS(cJSON *object, const char *string, cJSON *item);
/* Append reference to item to the specified array/object. Use this when you want to add an existing cJSON to a new cJSON, but don't want to corrupt your existing cJSON. */
CJSON_PUBLIC(void) cJSON_AddItemReferenceToArray(cJSON *array, cJSON *item);
CJSON_PUBLIC(void) cJSON_AddItemReferenceToObject(cJSON *object, const char *string, cJSON *item);
/* Remove/Detatch items from Arrays/Objects. */
CJSON_PUBLIC(cJSON *) cJSON_DetachItemViaPointer(cJSON *parent, cJSON * const item);
CJSON_PUBLIC(cJSON *) cJSON_DetachItemFromArray(cJSON *array, int which);
CJSON_PUBLIC(void) cJSON_DeleteItemFromArray(cJSON *array, int which);
CJSON_PUBLIC(cJSON *) cJSON_DetachItemFromObject(cJSON *object, const char *string);
CJSON_PUBLIC(cJSON *) cJSON_DetachItemFromObjectCaseSensitive(cJSON *object, const char *string);
CJSON_PUBLIC(void) cJSON_DeleteItemFromObject(cJSON *object, const char *string);
CJSON_PUBLIC(void) cJSON_DeleteItemFromObjectCaseSensitive(cJSON *object, const char *string);
/* Update array items. */
CJSON_PUBLIC(void) cJSON_InsertItemInArray(cJSON *array, int which, cJSON *newitem); /* Shifts pre-existing items to the right. */
CJSON_PUBLIC(cJSON_bool) cJSON_ReplaceItemViaPointer(cJSON * const parent, cJSON * const item, cJSON * replacement);
CJSON_PUBLIC(void) cJSON_ReplaceItemInArray(cJSON *array, int which, cJSON *newitem);
CJSON_PUBLIC(void) cJSON_ReplaceItemInObject(cJSON *object,const char *string,cJSON *newitem);
CJSON_PUBLIC(void) cJSON_ReplaceItemInObjectCaseSensitive(cJSON *object,const char *string,cJSON *newitem);
/* Duplicate a cJSON item */
CJSON_PUBLIC(cJSON *) cJSON_Duplicate(const cJSON *item, cJSON_bool recurse);
/* Duplicate will create a new, identical cJSON item to the one you pass, in new memory that will
need to be released. With recurse!=0, it will duplicate any children connected to the item.
The item->next and ->prev pointers are always zero on return from Duplicate. */
/* Recursively compare two cJSON items for equality. If either a or b is NULL or invalid, they will be considered unequal.
* case_sensitive determines if object keys are treated case sensitive (1) or case insensitive (0) */
CJSON_PUBLIC(cJSON_bool) cJSON_Compare(const cJSON * const a, const cJSON * const b, const cJSON_bool case_sensitive);
CJSON_PUBLIC(void) cJSON_Minify(char *json);
/* Macros for creating things quickly. */
#define cJSON_AddNullToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateNull())
#define cJSON_AddTrueToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateTrue())
#define cJSON_AddFalseToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateFalse())
#define cJSON_AddBoolToObject(object,name,b) cJSON_AddItemToObject(object, name, cJSON_CreateBool(b))
#define cJSON_AddNumberToObject(object,name,n) cJSON_AddItemToObject(object, name, cJSON_CreateNumber(n))
#define cJSON_AddStringToObject(object,name,s) cJSON_AddItemToObject(object, name, cJSON_CreateString(s))
#define cJSON_AddRawToObject(object,name,s) cJSON_AddItemToObject(object, name, cJSON_CreateRaw(s))
/* When assigning an integer value, it needs to be propagated to valuedouble too. */
#define cJSON_SetIntValue(object, number) ((object) ? (object)->valueint = (object)->valuedouble = (number) : (number))
/* helper for the cJSON_SetNumberValue macro */
CJSON_PUBLIC(double) cJSON_SetNumberHelper(cJSON *object, double number);
#define cJSON_SetNumberValue(object, number) ((object != NULL) ? cJSON_SetNumberHelper(object, (double)number) : (number))
/* Macro for iterating over an array or object */
#define cJSON_ArrayForEach(element, array) for(element = (array != NULL) ? (array)->child : NULL; element != NULL; element = element->next)
/* malloc/free objects using the malloc/free functions that have been set with cJSON_InitHooks */
CJSON_PUBLIC(void *) cJSON_malloc(size_t size);
CJSON_PUBLIC(void) cJSON_free(void *object);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __SIM_H__
#define __SIM_H__
#include <semaphore.h>
#include <stdbool.h>
#include <stdint.h>
#include "taos.h"
#include "tidpool.h"
#include "tlog.h"
#include "tmodule.h"
#include "tutil.h"
#define MAX_MAIN_SCRIPT_NUM 10
#define MAX_BACKGROUND_SCRIPT_NUM 10
#define MAX_FILE_NAME_LEN 256
#define MAX_ERROR_LEN 1024
#define MAX_QUERY_VALUE_LEN 40
#define MAX_QUERY_COL_NUM 10
#define MAX_QUERY_ROW_NUM 10
#define MAX_SYSTEM_RESULT_LEN 2048
#define MAX_VAR_LEN 100
#define MAX_VAR_NAME_LEN 32
#define MAX_VAR_VAL_LEN 80
#define MAX_OPT_NAME_LEN 32
#define MAX_SIM_CMD_NAME_LEN 40
#ifdef LINUX
#define SUCCESS_PREFIX "\033[44;32;1m"
#define SUCCESS_POSTFIX "\033[0m"
#define FAILED_PREFIX "\033[44;31;1m"
#define FAILED_POSTFIX "\033[0m"
#else
#define SUCCESS_PREFIX ""
#define SUCCESS_POSTFIX ""
#define FAILED_PREFIX ""
#define FAILED_POSTFIX ""
#endif
#define simError(...) \
if (simDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR SIM ", 255, __VA_ARGS__); \
}
#define simWarn(...) \
if (simDebugFlag & DEBUG_WARN) { \
tprintf("WARN SIM ", simDebugFlag, __VA_ARGS__); \
}
#define simTrace(...) \
if (simDebugFlag & DEBUG_TRACE) { \
tprintf("SIM ", simDebugFlag, __VA_ARGS__); \
}
#define simDump(x, y) \
if (simDebugFlag & DEBUG_DUMP) { \
taosDumpData(x, y); \
}
#define simPrint(...) \
{ tprintf("SIM ", 255, __VA_ARGS__); }
enum { SIM_SCRIPT_TYPE_MAIN, SIM_SCRIPT_TYPE_BACKGROUND };
enum {
SIM_CMD_EXP,
SIM_CMD_IF,
SIM_CMD_ELIF,
SIM_CMD_ELSE,
SIM_CMD_ENDI,
SIM_CMD_WHILE,
SIM_CMD_ENDW,
SIM_CMD_SWITCH,
SIM_CMD_CASE,
SIM_CMD_DEFAULT,
SIM_CMD_CONTINUE,
SIM_CMD_BREAK,
SIM_CMD_ENDS,
SIM_CMD_SLEEP,
SIM_CMD_GOTO,
SIM_CMD_RUN,
SIM_CMD_RUN_BACK,
SIM_CMD_PRINT,
SIM_CMD_SYSTEM,
SIM_CMD_SYSTEM_CONTENT,
SIM_CMD_SQL,
SIM_CMD_SQL_ERROR,
SIM_CMD_SQL_SLOW,
SIM_CMD_TEST,
SIM_CMD_RETURN,
SIM_CMD_END
};
enum {
SQL_JUMP_FALSE,
SQL_JUMP_TRUE,
};
struct _script_t;
typedef struct _cmd_t {
short cmdno;
short nlen;
char name[MAX_SIM_CMD_NAME_LEN];
bool (*parseCmd)(char *, struct _cmd_t *, int);
bool (*executeCmd)(struct _script_t *script, char *option);
struct _cmd_t *next;
} SCommand;
typedef struct {
short cmdno;
short jump; // jump position
short errorJump; // sql jump flag, while '-x' exist in sql cmd, this flag
// will be SQL_JUMP_TRUE, otherwise is SQL_JUMP_FALSE */
short lineNum; // correspodning line number in original file
int optionOffset; // relative option offset
} SCmdLine;
typedef struct _var_t {
char varName[MAX_VAR_NAME_LEN];
char varValue[MAX_VAR_VAL_LEN];
char varNameLen;
} SVariable;
typedef struct _script_t {
int type;
bool killed;
void *taos;
char rows[12]; // number of rows data retrieved
char data[MAX_QUERY_ROW_NUM][MAX_QUERY_COL_NUM]
[MAX_QUERY_VALUE_LEN]; // query results
char system_exit_code[12];
char system_ret_content[MAX_SYSTEM_RESULT_LEN];
int varLen;
int linePos; // current cmd position
int numOfLines; // number of lines in the script
int bgScriptLen;
char fileName[MAX_FILE_NAME_LEN]; // script file name
char error[MAX_ERROR_LEN];
char *optionBuffer;
SCmdLine *lines; // command list
SVariable variables[MAX_VAR_LEN];
struct _script_t *bgScripts[MAX_BACKGROUND_SCRIPT_NUM];
char auth[128];
} SScript;
extern SScript *simScriptList[MAX_MAIN_SCRIPT_NUM];
extern SCommand simCmdList[];
extern int simScriptPos;
extern int simScriptSucced;
extern int simDebugFlag;
extern char scriptDir[];
extern bool simAsyncQuery;
SScript *simParseScript(char *fileName);
SScript *simProcessCallOver(SScript *script);
void *simExecuteScript(void *script);
void simInitsimCmdList();
bool simSystemInit();
void simSystemCleanUp();
char *simGetVariable(SScript *script, char *varName, int varLen);
bool simExecuteExpCmd(SScript *script, char *option);
bool simExecuteTestCmd(SScript *script, char *option);
bool simExecuteGotoCmd(SScript *script, char *option);
bool simExecuteRunCmd(SScript *script, char *option);
bool simExecuteRunBackCmd(SScript *script, char *option);
bool simExecuteSystemCmd(SScript *script, char *option);
bool simExecuteSystemContentCmd(SScript *script, char *option);
bool simExecutePrintCmd(SScript *script, char *option);
bool simExecuteSleepCmd(SScript *script, char *option);
bool simExecuteReturnCmd(SScript *script, char *option);
bool simExecuteSqlCmd(SScript *script, char *option);
bool simExecuteSqlErrorCmd(SScript *script, char *rest);
bool simExecuteSqlSlowCmd(SScript *script, char *option);
void simVisuallizeOption(SScript *script, char *src, char *dst);
#endif
\ No newline at end of file
...@@ -12,16 +12,44 @@ ...@@ -12,16 +12,44 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdlib.h>
#include "vnodeWal.h" #ifndef __SIM_PARSE_H__
#define __SIM_PARSE_H__
#define MAX_NUM_CMD 64
#define MAX_NUM_LABLES 100
#define MAX_LABEL_LEN 40
#define MAX_NUM_BLOCK 100
#define MAX_NUM_JUMP 100
#define MAX_LINE_LEN 3000
#define MAX_CMD_LINES 2048
#define MAX_OPTION_BUFFER 64000
enum {
BLOCK_IF,
BLOCK_WHILE,
BLOCK_SWITCH,
};
/* label stack */
typedef struct { typedef struct {
/* TODO */ char top; /* number of labels */
} SWal; short pos[MAX_NUM_LABLES]; /* the position of the label */
char label[MAX_NUM_LABLES][MAX_LABEL_LEN]; /* name of the label */
} SLabel;
/* block definition */
typedef struct {
char top; /* the number of blocks stacked */
char type[MAX_NUM_BLOCK]; /* the block type */
short *pos[MAX_NUM_BLOCK]; /* position of the jump for if/elif/case */
short back[MAX_NUM_BLOCK]; /* go back, endw and continue */
char numJump[MAX_NUM_BLOCK];
short *jump[MAX_NUM_BLOCK][MAX_NUM_JUMP]; /* break or elif */
char sexp[MAX_NUM_BLOCK][40]; /*switch expression */
char sexpLen[MAX_NUM_BLOCK]; /*switch expression length */
} SBlock;
bool simParseExpression(char *token, int lineNum);
walh *vnodeOpenWal(int vnode, uint8_t op) { return NULL; } #endif
int vnodeCloseWal(walh *pWal) { return 0; } \ No newline at end of file
int vnodeRenewWal(walh *pWal) { return 0; }
int vnodeWriteWal(walh *pWal, void *cont, int contLen) { return 0; }
int vnodeSyncWal(walh *pWal) { return 0; }
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册