diff --git a/cmake/define.inc b/cmake/define.inc
index 8d6a3987099cbb11b3544d76bd5026ca2473d362..93bf6026106e4e2c6e788d2949446ab54b26813f 100755
--- a/cmake/define.inc
+++ b/cmake/define.inc
@@ -28,3 +28,7 @@ ENDIF ()
IF (TD_RANDOM_FILE_FAIL)
ADD_DEFINITIONS(-DTAOS_RANDOM_FILE_FAIL)
ENDIF ()
+
+IF (TD_RANDOM_NETWORK_FAIL)
+ ADD_DEFINITIONS(-DTAOS_RANDOM_NETWORK_FAIL)
+ENDIF ()
diff --git a/cmake/input.inc b/cmake/input.inc
index 574eac5b455dd3371a38eefbe57698c8ec328cad..e963e202400aa759962bd300138cca3b04962dc6 100755
--- a/cmake/input.inc
+++ b/cmake/input.inc
@@ -36,3 +36,8 @@ IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
ENDIF ()
+
+IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
+ SET(TD_RANDOM_NETWORK_FAIL TRUE)
+ MESSAGE(STATUS "build with random-network-fail enabled")
+ENDIF ()
diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c
index f6983fcb3506eba75a947d32bee6fb69e4ab704f..30729b06ca31f39c8c2820e69951d1eaa8a5f5db 100644
--- a/src/dnode/src/dnodeMgmt.c
+++ b/src/dnode/src/dnodeMgmt.c
@@ -106,6 +106,12 @@ int32_t dnodeInitMgmt() {
}
}
+ int32_t code = vnodeInitResources();
+ if (code != TSDB_CODE_SUCCESS) {
+ dnodeCleanupMgmt();
+ return -1;
+ }
+
// create the queue and thread to handle the message
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
@@ -127,7 +133,7 @@ int32_t dnodeInitMgmt() {
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
- int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
+ code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
@@ -282,13 +288,12 @@ static void *dnodeOpenVnode(void *param) {
}
static int32_t dnodeOpenVnodes() {
- int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
+ int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
- free(vnodeList);
return status;
}
@@ -334,7 +339,6 @@ static int32_t dnodeOpenVnodes() {
free(pThread->vnodeList);
}
- free(vnodeList);
free(threads);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
@@ -342,7 +346,7 @@ static int32_t dnodeOpenVnodes() {
}
void dnodeStartStream() {
- int32_t vnodeList[TSDB_MAX_VNODES];
+ int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
@@ -359,7 +363,7 @@ void dnodeStartStream() {
}
static void dnodeCloseVnodes() {
- int32_t vnodeList[TSDB_MAX_VNODES];
+ int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0;
int32_t status;
diff --git a/src/inc/vnode.h b/src/inc/vnode.h
index a034bc5706b3c7b3f4826ad0a35382503386669c..151ea3f56352272a1c852c476983a97e2a424593 100644
--- a/src/inc/vnode.h
+++ b/src/inc/vnode.h
@@ -61,6 +61,8 @@ int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
+
+int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h
index 58e255f7bc95ed0b1bf322554980c0efe432d9e9..00b9f33f1bb3c2443a2ed25097a3a7215c489666 100644
--- a/src/os/linux/inc/os.h
+++ b/src/os/linux/inc/os.h
@@ -86,9 +86,28 @@ extern "C" {
} \
}
+#ifdef TAOS_RANDOM_NETWORK_FAIL
+
+ssize_t taos_send_random_fail(int sockfd, const void *buf, size_t len, int flags);
+
+ssize_t taos_sendto_random_fail(int sockfd, const void *buf, size_t len, int flags,
+ const struct sockaddr *dest_addr, socklen_t addrlen);
+ssize_t taos_read_random_fail(int fd, void *buf, size_t count);
+ssize_t taos_write_random_fail(int fd, const void *buf, size_t count);
+
+#define send(sockfd, buf, len, flags) taos_send_random_fail(sockfd, buf, len, flags)
+#define sendto(sockfd, buf, len, flags, dest_addr, addrlen) \
+ taos_sendto_random_fail(sockfd, buf, len, flags, dest_addr, addrlen)
+#define taosWriteSocket(fd, buf, len) taos_write_random_fail(fd, buf, len)
+#define taosReadSocket(fd, buf, len) taos_read_random_fail(fd, buf, len)
+
+#else
+
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
+#endif /* TAOS_RANDOM_NETWORK_FAIL */
+
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
diff --git a/src/os/linux/src/linuxPlatform.c b/src/os/linux/src/linuxPlatform.c
index 9a38c98f81c36ad779c990fd40ca1d7395ffd3f8..216d8942bcac8953ecfd753658e71f6c2a422f4e 100644
--- a/src/os/linux/src/linuxPlatform.c
+++ b/src/os/linux/src/linuxPlatform.c
@@ -270,3 +270,49 @@ int tSystem(const char * cmd)
}
}
+#ifdef TAOS_RANDOM_NETWORK_FAIL
+
+#define RANDOM_NETWORK_FAIL_FACTOR 20
+
+ssize_t taos_send_random_fail(int sockfd, const void *buf, size_t len, int flags)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ return send(sockfd, buf, len, flags);
+}
+
+ssize_t taos_sendto_random_fail(int sockfd, const void *buf, size_t len, int flags,
+ const struct sockaddr *dest_addr, socklen_t addrlen)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
+}
+
+ssize_t taos_read_random_fail(int fd, void *buf, size_t count)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ return read(fd, buf, count);
+}
+
+ssize_t taos_write_random_fail(int fd, const void *buf, size_t count)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = EINTR;
+ return -1;
+ }
+
+ return write(fd, buf, count);
+}
+
+#endif /* TAOS_RANDOM_NETWORK_FAIL */
diff --git a/src/os/linux/src/linuxSysPara.c b/src/os/linux/src/linuxSysPara.c
index c2134765dfefba5cde330cb6a835e0c7c5261028..1331503619cd6295221bb9107334829d41e96fe3 100644
--- a/src/os/linux/src/linuxSysPara.c
+++ b/src/os/linux/src/linuxSysPara.c
@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() {
/* load time zone string from /etc/timezone */
FILE *f = fopen("/etc/timezone", "r");
- char buf[65] = {0};
+ char buf[68] = {0};
if (f != NULL) {
int len = fread(buf, 64, 1, f);
if(len < 64 && ferror(f)) {
@@ -170,18 +170,17 @@ static void taosGetSystemTimezone() {
}
fclose(f);
- }
- char *lineEnd = strstr(buf, "\n");
- if (lineEnd != NULL) {
- *lineEnd = 0;
- }
+ char *lineEnd = strstr(buf, "\n");
+ if (lineEnd != NULL) {
+ *lineEnd = 0;
+ }
- // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
- if (strlen(buf) > 0) {
- setenv("TZ", buf, 1);
+ // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
+ if (strlen(buf) > 0) {
+ setenv("TZ", buf, 1);
+ }
}
-
// get and set default timezone
tzset();
diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c
index e4f364d3d340a3407e01477f6342c1e553a152e8..c05c8c76e16fc96d5a513608e26fcd24956bc7b6 100644
--- a/src/rpc/src/rpcMain.c
+++ b/src/rpc/src/rpcMain.c
@@ -73,6 +73,7 @@ typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo
SRpcIpSet ipSet; // ip list provided by app
void *ahandle; // handle provided by app
+ void *signature; // for validation
struct SRpcConn *pConn; // pConn allocated
char msgType; // message type
uint8_t *pCont; // content provided by app
@@ -361,6 +362,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
pContext->ahandle = pMsg->ahandle;
+ pContext->signature = pContext;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = *pIpSet;
pContext->contLen = contLen;
@@ -527,11 +529,13 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return code;
}
-/* todo: cancel process may have race condition, pContext may have been released
- just before app calls the rpcCancelRequest */
void rpcCancelRequest(void *handle) {
SRpcReqContext *pContext = handle;
+ // signature is used to check if pContext is freed.
+ // pContext may have been released just before app calls the rpcCancelRequest
+ if (pContext->signature != pContext) return;
+
if (pContext->pConn) {
tDebug("%s, app trys to cancel request", pContext->pConn->info);
rpcCloseConn(pContext->pConn);
@@ -1005,6 +1009,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc;
+ pContext->signature = NULL;
pContext->pConn = NULL;
if (pContext->pRsp) {
// for synchronous API
diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c
index 97eeda010eb27c5b95b4308602268ff7c62752ea..eb7a2d5a66b8923a52f40930878ab0aec20262ba 100644
--- a/src/util/src/tfile.c
+++ b/src/util/src/tfile.c
@@ -26,12 +26,12 @@
#include "os.h"
-#define RANDOM_FACTOR 5
+#define RANDOM_FILE_FAIL_FACTOR 5
ssize_t taos_tread(int fd, void *buf, size_t count)
{
#ifdef TAOS_RANDOM_FILE_FAIL
- if (rand() % RANDOM_FACTOR == 0) {
+ if (rand() % RANDOM_FILE_FAIL_FACTOR == 0) {
errno = EIO;
return -1;
}
@@ -43,7 +43,7 @@ ssize_t taos_tread(int fd, void *buf, size_t count)
ssize_t taos_twrite(int fd, void *buf, size_t count)
{
#ifdef TAOS_RANDOM_FILE_FAIL
- if (rand() % RANDOM_FACTOR == 0) {
+ if (rand() % RANDOM_FILE_FAIL_FACTOR == 0) {
errno = EIO;
return -1;
}
@@ -55,7 +55,7 @@ ssize_t taos_twrite(int fd, void *buf, size_t count)
off_t taos_lseek(int fd, off_t offset, int whence)
{
#ifdef TAOS_RANDOM_FILE_FAIL
- if (rand() % RANDOM_FACTOR == 0) {
+ if (rand() % RANDOM_FILE_FAIL_FACTOR == 0) {
errno = EIO;
return -1;
}
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 0050de33994c48a89abcb107e350d1dec7e2527c..b469a5836ef21b1c804df109bce35af21644748c 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -34,8 +34,7 @@
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
-static int32_t tsOpennedVnodes;
-static void *tsDnodeVnodesHash;
+static SHashObj*tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
@@ -47,8 +46,6 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
-static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
-
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
@@ -58,25 +55,28 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif
-static void vnodeInit() {
+int32_t vnodeInitResources() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosHashInit(TSDB_MAX_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
if (tsDnodeVnodesHash == NULL) {
vError("failed to init vnode list");
+ return TSDB_CODE_VND_OUT_OF_MEMORY;
}
+
+ return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
- taosHashCleanup(tsDnodeVnodesHash);
- vnodeModuleInit = PTHREAD_ONCE_INIT;
- tsDnodeVnodesHash = NULL;
+ if (tsDnodeVnodesHash != NULL) {
+ taosHashCleanup(tsDnodeVnodesHash);
+ tsDnodeVnodesHash = NULL;
+ }
}
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
- pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t));
if (pTemp != NULL) {
@@ -144,11 +144,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
int32_t vnodeDrop(int32_t vgId) {
- if (tsDnodeVnodesHash == NULL) {
- vDebug("vgId:%d, failed to drop, vgId not exist", vgId);
- return TSDB_CODE_VND_INVALID_VGROUP_ID;
- }
-
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
vDebug("vgId:%d, failed to drop, vgId not find", vgId);
@@ -187,7 +182,6 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
- pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
if (pVnode == NULL) {
@@ -195,7 +189,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return TAOS_SYSTEM_ERROR(errno);
}
- atomic_add_fetch_32(&tsOpennedVnodes, 1);
atomic_add_fetch_32(&pVnode->refCount, 1);
pVnode->vgId = vnode;
@@ -366,13 +359,11 @@ void vnodeRelease(void *pVnodeRaw) {
free(pVnode);
- int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
+ int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
}
void *vnodeGetVnode(int32_t vgId) {
- if (tsDnodeVnodesHash == NULL) return NULL;
-
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
@@ -434,8 +425,6 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
- if (tsDnodeVnodesHash == NULL) return TSDB_CODE_SUCCESS;
-
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);
while (taosHashIterNext(pIter)) {
SVnodeObj **pVnode = taosHashIterGet(pIter);
diff --git a/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml b/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
index f6728359e5e58303d48b21c71325d9c0d5c6fe89..4e307db07935be293e4ec74566aa158fb1fece9e 100644
--- a/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
+++ b/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
@@ -94,7 +94,7 @@
com.google.guava
guava
- 18.0
+ 24.1.1