提交 66dd7d7e 编写于 作者: J jiacy-jcy

Merge branch '3.0' into 3.0test/jcy

......@@ -22,6 +22,7 @@ ELSEIF (TD_WINDOWS)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosd.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/udfd.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosBenchmark.exe DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.38-dist.jar DESTINATION connector/jdbc)
......
......@@ -97,13 +97,13 @@ IF ("${CPUTYPE}" STREQUAL "")
ELSE ()
# if generate ARM version:
# cmake -DCPUTYPE=aarch32 .. or cmake -DCPUTYPE=aarch64
IF (${CPUTYPE} MATCHES "aarch32" or ${CPUTYPE} MATCHES "arm32")
IF (${CPUTYPE} MATCHES "aarch32" OR ${CPUTYPE} MATCHES "arm32")
SET(PLATFORM_ARCH_STR "arm")
MESSAGE(STATUS "input cpuType: aarch32")
ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_32")
SET(TD_ARM_32 TRUE)
ELSEIF (${CPUTYPE} MATCHES "aarch64" or ${CPUTYPE} MATCHES "arm64")
ELSEIF (${CPUTYPE} MATCHES "aarch64" OR ${CPUTYPE} MATCHES "arm64")
SET(PLATFORM_ARCH_STR "arm64")
MESSAGE(STATUS "input cpuType: aarch64")
ADD_DEFINITIONS("-D_TD_ARM_")
......
此差异已折叠。
......@@ -47,8 +47,6 @@ typedef struct SRpcHandleInfo {
int8_t persistHandle; // persist handle or not
int8_t hasEpSet;
STraceId traceId;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
......@@ -58,7 +56,8 @@ typedef struct SRpcHandleInfo {
void *rsp;
int32_t rspLen;
// conn info
STraceId traceId;
SRpcConnInfo conn;
} SRpcHandleInfo;
......
......@@ -32,6 +32,7 @@ extern "C" {
typedef struct TdCmd *TdCmdPtr;
TdCmdPtr taosOpenCmd(const char* cmd);
int64_t taosGetsCmd(TdCmdPtr pCmd, int32_t maxSize, char *__restrict buf);
int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf);
int32_t taosEOFCmd(TdCmdPtr pCmd);
int64_t taosCloseCmd(TdCmdPtr* ppCmd);
......
......@@ -396,7 +396,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else
#define TSDB_MAX_RPC_THREADS 5
#define TSDB_MAX_RPC_THREADS 10
#endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
......
......@@ -11,13 +11,13 @@ if !%2==! GOTO USAGE
if "%1" == "cluster" (
set work_dir=%internal_dir%
set packagServerName_x64=TDengine-enterprise-server-%2-beta-Windows-x64
set packagServerName_x86=TDengine-enterprise-server-%2-beta-Windows-x86
@REM set packagServerName_x86=TDengine-enterprise-server-%2-beta-Windows-x86
set packagClientName_x64=TDengine-enterprise-client-%2-beta-Windows-x64
set packagClientName_x86=TDengine-enterprise-client-%2-beta-Windows-x86
) else (
set work_dir=%community_dir%
set packagServerName_x64=TDengine-server-%2-Windows-x64
set packagServerName_x86=TDengine-server-%2-Windows-x86
@REM set packagServerName_x86=TDengine-server-%2-Windows-x86
set packagClientName_x64=TDengine-client-%2-Windows-x64
set packagClientName_x86=TDengine-client-%2-Windows-x86
)
......@@ -59,8 +59,8 @@ rd /s /Q C:\TDengine
cmake --install .
if not %errorlevel% == 0 ( call :RUNFAILED build x86 failed & exit /b 1)
cd %package_dir%
iscc /DMyAppInstallName="%packagServerName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x86% failed & exit /b 1)
@REM iscc /DMyAppInstallName="%packagServerName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release
@REM if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x86% failed & exit /b 1)
iscc /DMyAppInstallName="%packagClientName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x86% failed & exit /b 1)
......
......@@ -199,7 +199,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
// send msg
syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg);
syncAppendEntriesBatchDestroy(pMsg);
// speed up
if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) {
......@@ -216,6 +215,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
} while (0);
#endif
}
syncAppendEntriesBatchDestroy(pMsg);
}
return ret;
......
......@@ -128,7 +128,7 @@ typedef struct {
int8_t retryCnt;
int8_t retryLimit;
// bool setMaxRetry;
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
......@@ -195,17 +195,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define transLabel(trans) ((STrans*)trans)->label
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
//
// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transFreeMsg(void* msg);
//
typedef struct SConnBuffer {
char* buf;
......@@ -323,7 +313,7 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list
typedef struct STransReq {
queue q;
void* data;
uv_write_t wreq;
} STransReq;
void transReqQueueInit(queue* q);
......
......@@ -17,6 +17,7 @@
typedef struct SConnList {
queue conn;
int32_t size;
} SConnList;
typedef struct SCliConn {
......@@ -339,8 +340,8 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
......@@ -510,7 +511,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
SHashObj* pPool = pool;
SConnList* plist = taosHashGet(pPool, key, strlen(key));
if (plist == NULL) {
SConnList list;
SConnList list = {0};
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pPool, key, strlen(key));
QUEUE_INIT(&plist->conn);
......@@ -519,15 +520,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
if (QUEUE_IS_EMPTY(&plist->conn)) {
return NULL;
}
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
return conn;
}
static void addConnToPool(void* pool, SCliConn* conn) {
......@@ -539,6 +543,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef(conn, true);
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(thrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
......@@ -556,13 +567,17 @@ static void addConnToPool(void* pool, SCliConn* conn) {
assert(conn->list != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conn, &conn->q);
conn->list->size += 1;
conn->task = NULL;
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) {
......@@ -1374,7 +1389,7 @@ int transReleaseCliHandle(void* handle) {
}
STransMsg tmsg = {.info.handle = handle};
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
// TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
......@@ -1415,7 +1430,6 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (ctx != NULL) {
pCtx->appCtx = *ctx;
}
assert(pTransInst->connType == TAOS_CONN_CLIENT);
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
......
......@@ -23,33 +23,6 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt;
static int32_t instMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
return ret;
}
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont);
......@@ -176,7 +149,6 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
* info--->|
*/
SConnBuffer* p = connBuf;
uvBuf->base = p->buf + p->len;
if (p->left == -1) {
uvBuf->len = p->cap - p->len;
......@@ -184,7 +156,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
if (p->left < p->cap - p->len) {
uvBuf->len = p->left;
} else {
p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
p->cap = p->left + p->len;
p->buf = taosMemoryRealloc(p->buf, p->cap);
uvBuf->base = p->buf + p->len;
uvBuf->len = p->left;
}
......@@ -266,14 +239,9 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
uv_async_t* async = &(pool->asyncs[idx]);
SAsyncItem* item = async->data;
int64_t st = taosGetTimestampUs();
taosThreadMutexLock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q);
taosThreadMutexUnlock(&item->mtx);
int64_t el = taosGetTimestampUs() - st;
if (el > 50) {
// tInfo("lock and unlock cost:%d", (int)el);
}
return uv_async_send(async);
}
......@@ -349,30 +317,21 @@ void transReqQueueInit(queue* q) {
QUEUE_INIT(q);
}
void* transReqQueuePush(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req;
req->data = wreq;
QUEUE_PUSH(q, &wreq->q);
return req;
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
req->wreq.data = req;
QUEUE_PUSH(q, &req->q);
return &req->wreq;
}
void* transReqQueueRemove(void* arg) {
void* ret = NULL;
uv_write_t* req = arg;
STransReq* wreq = req && req->data ? req->data : NULL;
uv_write_t* wreq = arg;
assert(wreq->data == req);
if (wreq == NULL || wreq->data == NULL) {
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
return req;
}
QUEUE_REMOVE(&wreq->q);
STransReq* req = wreq ? wreq->data : NULL;
if (req == NULL) return NULL;
QUEUE_REMOVE(&req->q);
ret = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
ret = wreq && wreq->handle ? wreq->handle->data : NULL;
taosMemoryFree(req);
return ret;
}
......@@ -381,7 +340,6 @@ void transReqQueueClear(queue* q) {
queue* h = QUEUE_HEAD(q);
QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req->data);
taosMemoryFree(req);
}
}
......
......@@ -75,7 +75,6 @@ typedef struct SWorkThrd {
SAsyncPool* asyncPool;
uv_prepare_t* prepare;
queue msg;
TdThreadMutex msgMtx;
queue conn;
void* pTransInst;
......@@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
......@@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->msg);
taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
......
......@@ -75,15 +75,14 @@ void processShellMsg() {
void *handle = pRpcMsg->info.handle;
taosFreeQitem(pRpcMsg);
{
SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize;
nRpcMsg.info.handle = handle;
nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
rpcSendResponse(&nRpcMsg);
}
//{
// SRpcMsg nRpcMsg = {0};
// nRpcMsg.pCont = rpcMallocCont(msgSize);
// nRpcMsg.contLen = msgSize;
// nRpcMsg.info.handle = handle;
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
// rpcSendResponse(&nRpcMsg);
//}
}
taosUpdateItemSize(qinfo.queue, numOfMsgs);
......
......@@ -16,6 +16,7 @@
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#include "tdef.h"
#include "pthread.h"
#ifdef WINDOWS
......@@ -57,7 +58,7 @@ int32_t taosGetAppName(char* name, int32_t* len) {
end = filepath;
}
strcpy(name, end);
tstrncpy(name, end, TSDB_APP_NAME_LEN);
if (len != NULL) {
*len = (int32_t)strlen(end);
......@@ -625,7 +626,7 @@ int32_t taosGetAppName(char *name, int32_t *len) {
buf[PATH_MAX] = '\0';
size_t n = strlen(buf);
if (len) *len = n;
if (name) strcpy(name, buf);
if (name) tstrncpy(name, buf, TSDB_APP_NAME_LEN);
return 0;
}
......@@ -668,7 +669,7 @@ int32_t taosGetAppName(char* name, int32_t* len) {
++end;
strcpy(name, end);
tstrncpy(name, end, TSDB_APP_NAME_LEN);
if (len != NULL) {
*len = strlen(name);
......
......@@ -398,7 +398,7 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
if (line != NULL) taosMemoryFree(line);
taosCloseFile(&pFile);
if (code != 0) {
if (code != 0 && (done & 1) == 0) {
TdFilePtr pFile1 = taosOpenFile("/proc/device-tree/model", TD_FILE_READ | TD_FILE_STREAM);
if (pFile1 == NULL) return code;
taosGetsFile(pFile1, maxLen, cpuModel);
......@@ -407,6 +407,16 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
done |= 1;
}
if (code != 0 && (done & 1) == 0) {
TdCmdPtr pCmd = taosOpenCmd("uname -a");
if (pCmd == NULL) return code;
if (taosGetsCmd(pCmd, maxLen, cpuModel) > 0) {
code = 0;
done |= 1;
}
taosCloseCmd(&pCmd);
}
if ((done & 2) == 0) {
*numOfCores = coreCount;
done |= 2;
......
......@@ -248,6 +248,16 @@ TdCmdPtr taosOpenCmd(const char* cmd) {
#endif
}
int64_t taosGetsCmd(TdCmdPtr pCmd, int32_t maxSize, char *__restrict buf) {
if (pCmd == NULL || buf == NULL) {
return -1;
}
if (fgets(buf, maxSize, (FILE*)pCmd) == NULL) {
return -1;
}
return strlen(buf);
}
int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf) {
if (pCmd == NULL || ptrBuf == NULL) {
return -1;
......
......@@ -332,8 +332,8 @@
./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
# --- sync
#./test.sh -f tsim/sync/3Replica1VgElect.sim
#./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/3Replica1VgElect.sim
./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册