diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c
index b28adfaaca4426873d1c12c250ae1c55bc002938..12be3deb2e33aba9be9b45acd1595a749ab1b2c5 100644
--- a/contrib/test/craft/raftMain.c
+++ b/contrib/test/craft/raftMain.c
@@ -377,7 +377,7 @@ void printConf(SRaftServerConfig *pConf) {
int main(int argc, char **argv) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
int32_t ret;
exe_name = argv[0];
diff --git a/contrib/test/craft/simulate_vnode.c b/contrib/test/craft/simulate_vnode.c
index 668fe638b719bacfb9f320b40767eac4fe6da71a..7ee9b9f8f01f14e48da956059e5b12422eba3f4f 100644
--- a/contrib/test/craft/simulate_vnode.c
+++ b/contrib/test/craft/simulate_vnode.c
@@ -132,7 +132,7 @@ static void proposeValue(struct raft *r) {
buf.base = raft_malloc(buf.len);
// mock ts value
- int vid = rand() % VNODE_COUNT;
+ int vid = taosRand() % VNODE_COUNT;
snprintf(buf.base, buf.len, "%d:value_%ld", vid, time(NULL));
printf("propose value: %s \n", (char*)buf.base);
@@ -174,7 +174,7 @@ void usage() {
}
int main(int argc, char **argv) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
exe_name = argv[0];
if (argc < 2) {
diff --git a/examples/c/schemaless.c b/examples/c/schemaless.c
index 21f39213cd557f3784a9c1b83172b978e3982683..99aa361b0a8801ff72467ed6f644262998f3c5b4 100644
--- a/examples/c/schemaless.c
+++ b/examples/c/schemaless.c
@@ -19,7 +19,7 @@ void shuffle(char**lines, size_t n)
size_t i;
for (i = 0; i < n - 1; i++)
{
- size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
+ size_t j = i + taosRand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j];
lines[j] = lines[i];
lines[i] = t;
diff --git a/include/os/osRand.h b/include/os/osRand.h
index 422ea92a71feabfae497e319879704af89bd655d..09e1f1b41d36dceac1530c961775c77ba6d20fdb 100644
--- a/include/os/osRand.h
+++ b/include/os/osRand.h
@@ -20,7 +20,16 @@
extern "C" {
#endif
+// If the error is in a third-party library, place this header file under the third-party library header file.
+#ifndef ALLOW_FORBID_FUNC
+ #define rand RAND_FUNC_TAOS_FORBID
+ #define srand SRAND_FUNC_TAOS_FORBID
+ #define rand_r RANDR_FUNC_TAOS_FORBID
+#endif
+
+void taosSeedRand(uint32_t seed);
uint32_t taosRand(void);
+uint32_t taosRandR(uint32_t *pSeed);
void taosRandStr(char* str, int32_t size);
uint32_t taosSafeRand(void);
diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c
index 08285c9d26f923c7ebfb420e7221c182e728313d..f5cc034e095b03b0b28dd9a735bc01ec3f93019a 100644
--- a/source/client/src/clientEnv.c
+++ b/source/client/src/clientEnv.c
@@ -208,7 +208,7 @@ void taos_init_imp(void) {
atexit(taos_cleanup);
errno = TSDB_CODE_SUCCESS;
- srand(taosGetTimestampSec());
+ taosSeedRand(taosGetTimestampSec());
deltaToUtcInitOnce();
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index 319bd9fde67b49112c968c75c7177b8453a2b66c..b6dd90a4e2586f44a3865e73ef335be92e778919 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -59,7 +59,7 @@ typedef struct {
SWalCfg walCfg;
uint32_t hashBegin;
uint32_t hashEnd;
- int8_t hashMethod;
+ int8_t hashMethod;
} SVnodeCfg;
typedef struct {
@@ -202,6 +202,22 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ READ --------------------------- */
+enum {
+ TQ_STREAM_TOKEN__DATA = 1,
+ TQ_STREAM_TOKEN__WATERMARK,
+ TQ_STREAM_TOKEN__CHECKPOINT,
+};
+
+typedef struct {
+ int8_t type;
+ int8_t reserved[7];
+ union {
+ void *data;
+ int64_t wmTs;
+ int64_t checkpointId;
+ };
+} STqStreamToken;
+
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) {
diff --git a/source/dnode/vnode/src/inc/tqPush.h b/source/dnode/vnode/src/inc/tqPush.h
index 32fd7c3ddf4cd5efd0a5125ae67865afbadfb580..a6121c5dc1b7e5b29626a2b6cb59bd9bd63b40b9 100644
--- a/source/dnode/vnode/src/inc/tqPush.h
+++ b/source/dnode/vnode/src/inc/tqPush.h
@@ -16,9 +16,11 @@
#ifndef _TQ_PUSH_H_
#define _TQ_PUSH_H_
+#include "executor.h"
#include "thash.h"
#include "trpc.h"
#include "ttimer.h"
+#include "vnode.h"
#ifdef __cplusplus
extern "C" {
@@ -39,11 +41,12 @@ typedef struct {
} STqClientPusher;
typedef struct {
- int8_t type;
- int8_t nodeType;
- int8_t reserved[6];
- int64_t streamId;
- SEpSet epSet;
+ int8_t type;
+ int8_t nodeType;
+ int8_t reserved[6];
+ int64_t streamId;
+ qTaskInfo_t task;
+ // TODO sync function
} STqStreamPusher;
typedef struct {
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 16809f1527b05eceafd3c54507a8d029145ff2f3..d15481b4aa54c6c5009f0030eca63b0685a749ec 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -67,6 +67,26 @@ void tqClose(STQ* pTq) {
}
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
+ if (msgType != TDMT_VND_SUBMIT) return 0;
+ void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
+ while (pIter != NULL) {
+ STqPusher* pusher = *(STqPusher**)pIter;
+ if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
+ STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
+ // repack
+ STqStreamToken* token = malloc(sizeof(STqStreamToken));
+ if (token == NULL) {
+ taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+ token->type = TQ_STREAM_TOKEN__DATA;
+ token->data = msg;
+ // set input
+ // exec
+ }
+ // send msg to ep
+ }
// iterate hash
// process all msg
// if waiting
diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c
index fea65846be749d2fe73f5319477a989e86e2cc33..4186f29e2aa7111ce53f143247bbaf5f15018d66 100644
--- a/source/dnode/vnode/src/tq/tqPush.c
+++ b/source/dnode/vnode/src/tq/tqPush.c
@@ -73,7 +73,7 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet
streamPusher->type = TQ_PUSHER_TYPE__STREAM;
streamPusher->nodeType = 0;
streamPusher->streamId = streamId;
- memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));
+ /*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/
if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c
index 2f24df03099a728afe30c1b8383e68e2b0face62..92a111298fa093699dd75cbadec5846d11fefa44 100644
--- a/source/dnode/vnode/src/tq/tqRead.c
+++ b/source/dnode/vnode/src/tq/tqRead.c
@@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#define _DEFAULT_SOURCE
#include "vnode.h"
@@ -37,6 +36,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
+ // iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
diff --git a/source/dnode/vnode/test/tqMetaTest.cpp b/source/dnode/vnode/test/tqMetaTest.cpp
index d3c9b50e4abd884903f50b2bf5d10bd8ec71eb00..4f1518525476127241401741c371f0a8071dc6d9 100644
--- a/source/dnode/vnode/test/tqMetaTest.cpp
+++ b/source/dnode/vnode/test/tqMetaTest.cpp
@@ -168,10 +168,10 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) {
}
TEST_F(TqMetaUpdateAppendTest, multiplePage) {
- srand(0);
+ taosSeedRand(0);
std::vector v;
for (int i = 0; i < 1000; i++) {
- v.push_back(rand());
+ v.push_back(taosRand());
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
@@ -202,10 +202,10 @@ TEST_F(TqMetaUpdateAppendTest, multiplePage) {
}
TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
- srand(0);
+ taosSeedRand(0);
std::vector v;
for (int i = 0; i < 1000; i++) {
- v.push_back(rand());
+ v.push_back(taosRand());
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
@@ -213,14 +213,14 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
for (int i = 0; i < 500; i++) {
tqHandleCommit(pMeta, i);
- v[i] = rand();
+ v[i] = taosRand();
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
}
for (int i = 500; i < 1000; i++) {
- v[i] = rand();
+ v[i] = taosRand();
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
@@ -235,7 +235,7 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
ASSERT(pMeta);
for (int i = 500; i < 1000; i++) {
- v[i] = rand();
+ v[i] = taosRand();
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
@@ -250,10 +250,10 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
}
TEST_F(TqMetaUpdateAppendTest, dupCommit) {
- srand(0);
+ taosSeedRand(0);
std::vector v;
for (int i = 0; i < 1000; i++) {
- v.push_back(rand());
+ v.push_back(taosRand());
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp
index c7867c4da5be3b7ab06a13370050a5c6d43b28ef..00f6a508b71677eb8d43cbe574e3e180f12f7035 100644
--- a/source/libs/catalog/test/catalogTests.cpp
+++ b/source/libs/catalog/test/catalogTests.cpp
@@ -723,7 +723,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
}
if (ctgTestEnableSleep) {
- usleep(rand() % 5);
+ usleep(taosRand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n);
@@ -747,7 +747,7 @@ void *ctgTestSetSameDbVgroupThread(void *param) {
}
if (ctgTestEnableSleep) {
- usleep(rand() % 5);
+ usleep(taosRand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
@@ -771,7 +771,7 @@ void *ctgTestSetDiffDbVgroupThread(void *param) {
}
if (ctgTestEnableSleep) {
- usleep(rand() % 5);
+ usleep(taosRand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
@@ -801,7 +801,7 @@ void *ctgTestGetCtableMetaThread(void *param) {
tfree(tbMeta);
if (ctgTestEnableSleep) {
- usleep(rand() % 5);
+ usleep(taosRand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
@@ -838,7 +838,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
}
if (ctgTestEnableSleep) {
- usleep(rand() % 5);
+ usleep(taosRand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index eeb48a1f3db6e587770f8230f5cf6996ebeb11a6..e30b51cbdf4c48e6fb3404feabf95f868c1f9e48 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -61,7 +61,7 @@ typedef enum SResultTsInterpType {
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
- uint32_t v = rand();
+ uint32_t v = taosRand();
if (v % 1000 <= 0) {
return NULL;
@@ -71,7 +71,7 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
}
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
- uint32_t v = rand();
+ uint32_t v = taosRand();
if (v % 1000 <= 0) {
return NULL;
} else {
@@ -80,7 +80,7 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
}
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
- uint32_t v = rand();
+ uint32_t v = taosRand();
if (v % 5 <= 1) {
return NULL;
} else {
diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp
index ff29d5f3555613b08911fbb8d1b39cc6cab9b645..72ef13124ad222e42a64d1519a78a8f75d44adf0 100644
--- a/source/libs/executor/test/executorTests.cpp
+++ b/source/libs/executor/test/executorTests.cpp
@@ -869,7 +869,7 @@ TEST(testCase, external_sort_Test) {
#if 0
su* v = static_cast(calloc(1000000, sizeof(su)));
for(int32_t i = 0; i < 1000000; ++i) {
- v[i].v = rand();
+ v[i].v = taosRand();
v[i].c = static_cast(malloc(4));
*(int32_t*) v[i].c = i;
}
@@ -882,7 +882,7 @@ TEST(testCase, external_sort_Test) {
return;
#endif
- srand(time(NULL));
+ taosSeedRand(time(NULL));
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
@@ -943,7 +943,7 @@ TEST(testCase, external_sort_Test) {
}
TEST(testCase, sorted_merge_Test) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
@@ -1015,7 +1015,7 @@ TEST(testCase, sorted_merge_Test) {
}
TEST(testCase, time_interval_Operator_Test) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp
index 66ef3b0877d67f3967a1b642e03d84c4e979c8d5..88cf713727abde0f6eada6c00257734e2b989bd9 100644
--- a/source/libs/executor/test/lhashTests.cpp
+++ b/source/libs/executor/test/lhashTests.cpp
@@ -25,7 +25,7 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
TEST(testCase, linear_hash_Tests) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
#if 0
diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc
index ce3f7fe25e3e1b58cdf50fe6fda861edd5795b9e..699c785be5a53a86c5e07956d50fb12ee10e3623 100644
--- a/source/libs/index/test/indexTests.cc
+++ b/source/libs/index/test/indexTests.cc
@@ -699,7 +699,7 @@ class IndexObj {
for (int i = 0; i < numOfTable; i++) {
for (int k = 0; k < 10 && k < colVal.size(); k++) {
// opt
- tColVal[rand() % colValSize] = 'a' + k % 26;
+ tColVal[taosRand() % colValSize] = 'a' + k % 26;
}
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
tColVal.c_str(), tColVal.size());
diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp
index 231f0c7fffdf989e361e63b6a4f5a5a8a39b4759..8658c4cead00fff30f567ea87adeda3ea9a5cd7c 100644
--- a/source/libs/qworker/test/qworkerTests.cpp
+++ b/source/libs/qworker/test/qworkerTests.cpp
@@ -266,7 +266,7 @@ int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTas
int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum);
qwtTestSinkBlockNum = 0;
- qwtTestSinkMaxBlockNum = rand() % 100 + 1;
+ qwtTestSinkMaxBlockNum = taosRand() % 100 + 1;
qwtTestSinkQueryEnd = false;
if (0 == idx) {
@@ -295,15 +295,15 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
} else {
if (qwtTestSinkQueryEnd) {
*pRes = NULL;
- *useconds = rand() % 10;
+ *useconds = taosRand() % 10;
return 0;
}
- endExec = rand() % 5;
+ endExec = taosRand() % 5;
int32_t runTime = 0;
if (qwtTestEnableSleep && qwtTestMaxExecTaskUsec > 0) {
- runTime = rand() % qwtTestMaxExecTaskUsec;
+ runTime = taosRand() % qwtTestMaxExecTaskUsec;
}
if (qwtTestEnableSleep) {
@@ -314,10 +314,10 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if (endExec) {
*pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock));
- (*pRes)->info.rows = rand() % 1000;
+ (*pRes)->info.rows = taosRand() % 1000;
} else {
*pRes = NULL;
- *useconds = rand() % 10;
+ *useconds = taosRand() % 10;
}
}
@@ -376,7 +376,7 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
taosWLockLatch(&qwtTestSinkLock);
if (qwtTestSinkBlockNum > 0) {
- *pLen = rand() % 100 + 1;
+ *pLen = taosRand() % 100 + 1;
qwtTestSinkBlockNum--;
} else {
*pLen = 0;
@@ -392,7 +392,7 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
taosWLockLatch(&qwtTestSinkLock);
if (qwtTestSinkLastLen > 0) {
- pOutput->numOfRows = rand() % 10 + 1;
+ pOutput->numOfRows = taosRand() % 10 + 1;
pOutput->compressed = 1;
pOutput->queryEnd = qwtTestSinkQueryEnd;
if (qwtTestSinkBlockNum == 0) {
@@ -402,7 +402,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
} else {
pOutput->bufStatus = DS_BUF_FULL;
}
- pOutput->useconds = rand() % 10 + 1;
+ pOutput->useconds = taosRand() % 10 + 1;
pOutput->precision = 1;
} else if (qwtTestSinkLastLen == 0) {
pOutput->numOfRows = 0;
@@ -416,7 +416,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
} else {
pOutput->bufStatus = DS_BUF_FULL;
}
- pOutput->useconds = rand() % 10 + 1;
+ pOutput->useconds = taosRand() % 10 + 1;
pOutput->precision = 1;
} else {
assert(0);
@@ -590,7 +590,7 @@ void *queryThread(void *param) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
if (qwtTestEnableSleep) {
- usleep(rand()%5);
+ usleep(taosRand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("query:%d\n", n);
@@ -612,7 +612,7 @@ void *readyThread(void *param) {
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
if (qwtTestEnableSleep) {
- usleep(rand()%5);
+ usleep(taosRand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("ready:%d\n", n);
@@ -634,7 +634,7 @@ void *fetchThread(void *param) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
if (qwtTestEnableSleep) {
- usleep(rand()%5);
+ usleep(taosRand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("fetch:%d\n", n);
@@ -656,7 +656,7 @@ void *dropThread(void *param) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
if (qwtTestEnableSleep) {
- usleep(rand()%5);
+ usleep(taosRand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("drop:%d\n", n);
@@ -678,7 +678,7 @@ void *statusThread(void *param) {
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
if (qwtTestEnableSleep) {
- usleep(rand()%5);
+ usleep(taosRand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("status:%d\n", n);
@@ -748,7 +748,7 @@ void *queryQueueThread(void *param) {
if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
- int32_t delay = rand() % qwtTestReqMaxDelayUsec;
+ int32_t delay = taosRand() % qwtTestReqMaxDelayUsec;
if (delay) {
usleep(delay);
@@ -804,7 +804,7 @@ void *fetchQueueThread(void *param) {
taosWUnLockLatch(&qwtTestFetchQueueLock);
if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
- int32_t delay = rand() % qwtTestReqMaxDelayUsec;
+ int32_t delay = taosRand() % qwtTestReqMaxDelayUsec;
if (delay) {
usleep(delay);
@@ -963,7 +963,7 @@ TEST(seqTest, randCase) {
stubSetRpcSendResponse();
stubSetCreateExecTask();
- srand(time(NULL));
+ taosSeedRand(time(NULL));
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
@@ -971,7 +971,7 @@ TEST(seqTest, randCase) {
int32_t t = 0;
int32_t maxr = 10001;
while (true) {
- int32_t r = rand() % maxr;
+ int32_t r = taosRand() % maxr;
if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++);
@@ -1025,7 +1025,7 @@ TEST(seqTest, multithreadRand) {
stubSetStringToPlan();
stubSetRpcSendResponse();
- srand(time(NULL));
+ taosSeedRand(time(NULL));
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
@@ -1076,7 +1076,7 @@ TEST(rcTest, shortExecshortDelay) {
stubSetPutDataBlock();
stubSetGetDataBlock();
- srand(time(NULL));
+ taosSeedRand(time(NULL));
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
@@ -1157,7 +1157,7 @@ TEST(rcTest, longExecshortDelay) {
stubSetPutDataBlock();
stubSetGetDataBlock();
- srand(time(NULL));
+ taosSeedRand(time(NULL));
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
@@ -1240,7 +1240,7 @@ TEST(rcTest, shortExeclongDelay) {
stubSetPutDataBlock();
stubSetGetDataBlock();
- srand(time(NULL));
+ taosSeedRand(time(NULL));
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
@@ -1324,7 +1324,7 @@ TEST(rcTest, dropTest) {
stubSetPutDataBlock();
stubSetGetDataBlock();
- srand(time(NULL));
+ taosSeedRand(time(NULL));
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
@@ -1358,7 +1358,7 @@ TEST(rcTest, dropTest) {
int main(int argc, char** argv) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp
index 08210aa2f0d479d007f4b989f46a350aa2b3db49..c69a1eb247195ad8a53140570d6c4bd68996e999 100644
--- a/source/libs/scalar/test/filter/filterTests.cpp
+++ b/source/libs/scalar/test/filter/filterTests.cpp
@@ -1286,7 +1286,7 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
int main(int argc, char** argv) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp
index faf13f0a82cfa30327f26fb0809ec0420f870a88..6518dbec87620072ea3ad157cf89f552ce23849e 100644
--- a/source/libs/scalar/test/scalar/scalarTests.cpp
+++ b/source/libs/scalar/test/scalar/scalarTests.cpp
@@ -1427,7 +1427,7 @@ TEST(columnTest, greater_and_lower) {
int main(int argc, char** argv) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp
index 8ed963d875700353fc805b84d60c3e5530e058f7..f17dcbd1f5d04c29512251e8eb853c4eb3040265 100644
--- a/source/libs/scheduler/test/schedulerTests.cpp
+++ b/source/libs/scheduler/test/schedulerTests.cpp
@@ -532,7 +532,7 @@ void* schtRunJobThread(void *aa) {
void* schtFreeJobThread(void *aa) {
while (!schtTestStop) {
- usleep(rand() % 100);
+ usleep(taosRand() % 100);
schtFreeQueryJob(1);
}
}
@@ -713,7 +713,7 @@ TEST(multiThread, forceFree) {
}
int main(int argc, char** argv) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c
index 6917df1597a68f29d2436ce0da39ce17dc7e7bc7..fa58bda76f9a78314d26808c2528e73971039060 100644
--- a/source/libs/sync/src/syncEnv.c
+++ b/source/libs/sync/src/syncEnv.c
@@ -28,7 +28,7 @@ static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer);
int32_t syncEnvStart() {
int32_t ret;
- srand(time(NULL));
+ taosSeedRand(time(NULL));
gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
assert(gSyncEnv != NULL);
ret = doSyncEnvStart(gSyncEnv);
diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c
index d37c821a2475790ee5e2ba7c2e127de8f8d49617..7cb42c9f87dbffaffcc94e26aed5b26a965a4a11 100644
--- a/source/libs/sync/src/syncIO.c
+++ b/source/libs/sync/src/syncIO.c
@@ -44,7 +44,7 @@ int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
- srand(time(NULL));
+ taosSeedRand(time(NULL));
int32_t ret = syncIOStartInternal(gSyncIO);
assert(ret == 0);
diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c
index 3fb430a714bd8b7e1a390ca25cace75c1874f2c8..216dccd62c24d8e588956c23972c1069cfe6ba27 100644
--- a/source/libs/sync/src/syncUtil.c
+++ b/source/libs/sync/src/syncUtil.c
@@ -95,7 +95,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
// ---- misc ----
-int32_t syncUtilRand(int32_t max) { return rand() % max; }
+int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
diff --git a/source/libs/tdb/src/db/tdbUtil.c b/source/libs/tdb/src/db/tdbUtil.c
index fe0f3befd62717fe7748cd3b0b92217f4d6d5b11..237a39e47db8223827320e070238e7b251d3bcc0 100644
--- a/source/libs/tdb/src/db/tdbUtil.c
+++ b/source/libs/tdb/src/db/tdbUtil.c
@@ -27,7 +27,7 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) {
((uint64_t *)fileid)[0] = stDev;
((uint64_t *)fileid)[1] = stIno;
if (unique) {
- ((uint64_t *)fileid)[2] = rand();
+ ((uint64_t *)fileid)[2] = taosRand();
}
return 0;
diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c
index e1319da1624b0b32c9cd0e987768e0ea8e66e022..615c576a9b69e92ddea9ffd9125424eb0170316c 100644
--- a/source/libs/transport/src/rpcMain.c
+++ b/source/libs/transport/src/rpcMain.c
@@ -749,7 +749,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
memcpy(pConn->user, pHead->user, tListLen(pConn->user));
pConn->pRpc = pRpc;
pConn->sid = sid;
- pConn->tranId = (uint16_t)(rand() & 0xFFFF);
+ pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid);
pConn->linkUid = pHead->linkUid;
if (pRpc->afp) {
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 699da7579028dfd690a24a4136ea59617c398d00..0f6ac0b214741eab7f365603457479ff778cf876 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -34,9 +34,6 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
int32_t walRollback(SWal *pWal, int64_t ver) {
int code;
char fnameStr[WAL_FILE_LEN];
- if (ver == pWal->vers.lastVer) {
- return 0;
- }
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp
index 230555e016d58178b42e258b7c350aa0804d513b..f44fc719649b4226392098d09fe51a2092bb0d7c 100644
--- a/source/libs/wal/test/walMetaTest.cpp
+++ b/source/libs/wal/test/walMetaTest.cpp
@@ -124,13 +124,8 @@ class WalRetentionEnv : public ::testing::Test {
void SetUp() override {
SWalCfg cfg;
- cfg.rollPeriod = -1,
- cfg.segSize = -1,
- cfg.retentionPeriod = -1,
- cfg.retentionSize = 0,
- cfg.rollPeriod = 0,
- cfg.vgId = 0,
- cfg.level = TAOS_WAL_FSYNC;
+ cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0,
+ cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL);
}
@@ -241,6 +236,12 @@ TEST_F(WalCleanEnv, rollback) {
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
}
+ code = walRollback(pWal, 12);
+ ASSERT_NE(code, 0);
+ ASSERT_EQ(pWal->vers.lastVer, 9);
+ code = walRollback(pWal, 9);
+ ASSERT_EQ(code, 0);
+ ASSERT_EQ(pWal->vers.lastVer, 8);
code = walRollback(pWal, 5);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 4);
@@ -299,7 +300,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
ASSERT_EQ(code, 0);
}
for (int i = 0; i < 1000; i++) {
- int ver = rand() % 100;
+ int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
@@ -324,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv();
int code;
-
+
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
@@ -336,14 +337,14 @@ TEST_F(WalRetentionEnv, repairMeta1) {
TearDown();
- //getchar();
+ // getchar();
char buf[100];
sprintf(buf, "%s/meta-ver%d", pathName, 0);
taosRemoveFile(buf);
sprintf(buf, "%s/meta-ver%d", pathName, 1);
taosRemoveFile(buf);
SetUp();
- //getchar();
+ // getchar();
ASSERT_EQ(pWal->vers.lastVer, 99);
@@ -351,7 +352,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) {
- int ver = rand() % 100;
+ int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
@@ -381,7 +382,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
}
for (int i = 0; i < 1000; i++) {
- int ver = rand() % 200;
+ int ver = taosRand() % 200;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
@@ -401,5 +402,4 @@ TEST_F(WalRetentionEnv, repairMeta1) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
-
}
diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c
index 0b2fe904b37427b85dbf44f847af21c5a46472d6..63fa60021752686ea80a591160c6b7e2f65ae2a8 100644
--- a/source/os/src/osEnv.c
+++ b/source/os/src/osEnv.c
@@ -38,7 +38,7 @@ float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0;
void osInit() {
- srand(taosSafeRand());
+ taosSeedRand(taosSafeRand());
taosGetSystemLocale(tsLocale, tsCharset);
taosGetSystemTimezone(tsTimezone);
taosSetSystemTimezone(tsTimezone, tsTimezone, &tsDaylight);
diff --git a/source/os/src/osRand.c b/source/os/src/osRand.c
index b81e41b3cf3b9631ee778a233f9d12cddfe534ed..f3dd9b74c5fe185f14d1e3b4832092eb5613b4df 100644
--- a/source/os/src/osRand.c
+++ b/source/os/src/osRand.c
@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-
+#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
@@ -21,8 +21,12 @@
#include
#endif
+void taosSeedRand(uint32_t seed) { return srand(seed); }
+
uint32_t taosRand(void) { return rand(); }
+uint32_t taosRandR(uint32_t *pSeed) { return rand_r(pSeed); }
+
uint32_t taosSafeRand(void) {
TdFilePtr pFile;
int seed;
diff --git a/source/util/src/tdes.c b/source/util/src/tdes.c
index 105dd7f95fc204d11ad9233372984944760dfd70..d12b47efe8ac390b001c9e80461c8a881cc19dbc 100644
--- a/source/util/src/tdes.c
+++ b/source/util/src/tdes.c
@@ -32,7 +32,7 @@ void process_message(uint8_t* message_piece, uint8_t* processed_piece, key_set*
#if 0
int64_t taosDesGenKey() {
uint32_t iseed = (uint32_t)time(NULL);
- srand(iseed);
+ taosSeedRand(iseed);
uint8_t key[8] = {0};
generate_key(key);
diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c
index 6b89ed2c430f240a3b1a219df85b9b876f560c9b..d9d6e4e3da74bca636e7372cda21b9482da7be51 100644
--- a/source/util/src/tskiplist.c
+++ b/source/util/src/tskiplist.c
@@ -51,7 +51,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
pSkipList->len = keyLen;
pSkipList->flags = flags;
pSkipList->keyFn = fn;
- pSkipList->seed = rand();
+ pSkipList->seed = taosRand();
#if 0
// the function getkeycomparfunc is defined in common
@@ -82,7 +82,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
}
}
- srand((uint32_t)time(NULL));
+ taosSeedRand((uint32_t)time(NULL));
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalMemSize += sizeof(SSkipList);
@@ -560,9 +560,9 @@ static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) {
int32_t n = 1;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
- while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) {
+ while ((taosRand() % factor) == 0 && n <= pSkipList->maxLevel) {
#else
- while ((rand_r(&(pSkipList->seed)) % factor) == 0 && n <= pSkipList->maxLevel) {
+ while ((taosRandR(&(pSkipList->seed)) % factor) == 0 && n <= pSkipList->maxLevel) {
#endif
n++;
}
diff --git a/source/util/test/codingTests.cpp b/source/util/test/codingTests.cpp
index 0cd9524646f0bae67c8e24234d1a204302f90350..b99141104791e5b9bd41f92e883bc39aa67a3edb 100644
--- a/source/util/test/codingTests.cpp
+++ b/source/util/test/codingTests.cpp
@@ -150,7 +150,7 @@ static bool test_variant_int64(int64_t value) {
}
TEST(codingTest, fixed_encode_decode) {
- srand(time(0));
+ taosSeedRand(time(0));
// uint16_t
for (uint16_t value = 0; value <= UINT16_MAX; value++) {
@@ -204,7 +204,7 @@ TEST(codingTest, fixed_encode_decode) {
}
TEST(codingTest, variant_encode_decode) {
- srand(time(0));
+ taosSeedRand(time(0));
// uint16_t
for (uint16_t value = 0; value <= UINT16_MAX; value++) {
diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp
index f392aac7d16eb13908313639af8619e01d2c4760..e63e6f04a1a2605950b5b6acad758b620198f2a3 100644
--- a/source/util/test/pageBufferTest.cpp
+++ b/source/util/test/pageBufferTest.cpp
@@ -161,7 +161,7 @@ void recyclePageTest() {
TEST(testCase, resultBufferTest) {
- srand(time(NULL));
+ taosSeedRand(time(NULL));
simpleTest();
writeDownTest();
recyclePageTest();
diff --git a/source/util/test/skiplistTest.cpp b/source/util/test/skiplistTest.cpp
index f2e696b0e5b880190f4e7965a10b9265b99b070a..f61ebfd890bcf20682ab6a77e61d2b58c56368fb 100644
--- a/source/util/test/skiplistTest.cpp
+++ b/source/util/test/skiplistTest.cpp
@@ -47,7 +47,7 @@ void doubleSkipListTest() {
SSkipListKey sk;
for (int32_t i = 0; i < 100; ++i) {
sk.nType = TSDB_DATA_TYPE_DOUBLE;
- int32_t idx = abs((i * rand()) % 1000);
+ int32_t idx = abs((i * taosRand()) % 1000);
sk.dKey = doubleVal[idx];
@@ -74,7 +74,7 @@ void randKeyTest() {
false, getkey);
int32_t size = 200000;
- srand(time(NULL));
+ taosSeedRand(time(NULL));
printf("generated %d keys is: \n", size);
@@ -87,7 +87,7 @@ void randKeyTest() {
d->level = level;
int32_t* key = (int32_t*)SL_GET_NODE_KEY(pSkipList, d);
- key[0] = rand() % 1000000000;
+ key[0] = taosRand() % 1000000000;
key[1] = key[0];
@@ -337,7 +337,7 @@ void duplicatedKeyTest() {
TEST(testCase, skiplist_test) {
assert(sizeof(SSkipListKey) == 8);
- srand(time(NULL));
+ taosSeedRand(time(NULL));
stringKeySkiplistTest();
doubleSkipListTest();