diff --git a/cmake/cmake.options b/cmake/cmake.options index 44fa8c7e4b13bed3cf75a07c66c79054a0c253d0..faa45256fb7ed7395136ecf54d5f24448466720f 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -56,6 +56,12 @@ option( OFF ) +option( + BUILD_WITH_TRAFT + "If build with traft" + OFF +) + option( BUILD_DEPENDENCY_TESTS "If build dependency tests" diff --git a/cmake/traft_CMakeLists.txt.in b/cmake/traft_CMakeLists.txt.in new file mode 100644 index 0000000000000000000000000000000000000000..9b571b3666a7111eaba9fc333a3dadc9487811f9 --- /dev/null +++ b/cmake/traft_CMakeLists.txt.in @@ -0,0 +1,14 @@ + +# traft +ExternalProject_Add(traft + GIT_REPOSITORY https://github.com/taosdata/traft.git + GIT_TAG for_3.0 + SOURCE_DIR "${CMAKE_CONTRIB_DIR}/traft" + BINARY_DIR "${CMAKE_CONTRIB_DIR}/traft" + #BUILD_IN_SOURCE TRUE + # https://answers.ros.org/question/333125/how-to-include-external-automakeautoconf-projects-into-ament_cmake/ + CONFIGURE_COMMAND COMMAND autoreconf -i COMMAND ./configure --enable-example + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c08f894fe7e8f7dc83a0269ba606607974e4f1bd..074014b0d7ba89cd7a712bbd88e13fe2952e20fa 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -41,6 +41,12 @@ if(${BUILD_WITH_CRAFT}) SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE) endif(${BUILD_WITH_CRAFT}) +# traft +if(${BUILD_WITH_TRAFT}) + cat("${CMAKE_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE) +endif(${BUILD_WITH_TRAFT}) + #libuv if(${BUILD_WITH_UV}) cat("${CMAKE_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -173,6 +179,18 @@ if(${BUILD_WITH_CRAFT}) # ) endif(${BUILD_WITH_CRAFT}) +# TRAFT +if(${BUILD_WITH_TRAFT}) + add_library(traft STATIC IMPORTED GLOBAL) + set_target_properties(traft PROPERTIES + IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a" + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include" + ) + # target_link_libraries(craft + # INTERFACE pthread + # ) +endif(${BUILD_WITH_TRAFT}) + # LIBUV if(${BUILD_WITH_UV}) add_subdirectory(libuv) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index 330fe8f70f8d9bfcd9e09e77611ae67794cc86a0..0c71113056f178e04bc394a3b0afea36ebf7c886 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -19,4 +19,8 @@ if(${BUILD_WITH_CRAFT}) add_subdirectory(craft) endif(${BUILD_WITH_CRAFT}) +if(${BUILD_WITH_TRAFT}) + add_subdirectory(traft) +endif(${BUILD_WITH_TRAFT}) + add_subdirectory(tdev) diff --git a/contrib/test/craft/common.h b/contrib/test/craft/common.h index 1e94ee8bcae2b990d2fb7ace3d7c0f245bd4069c..282e2825431f62ce1b1d6b85290576d0a5b957f8 100644 --- a/contrib/test/craft/common.h +++ b/contrib/test/craft/common.h @@ -20,6 +20,7 @@ typedef struct { } Addr; typedef struct { + int voter; Addr me; Addr peers[MAX_PEERS]; int peersCount; diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c index 52e0b694dc325681c7c428e53e9db75b0a1bc3f3..bae083cf9451632a5d1e2b57b8546b60634674d1 100644 --- a/contrib/test/craft/raftMain.c +++ b/contrib/test/craft/raftMain.c @@ -104,7 +104,7 @@ const char* state2String(unsigned short state) { void printRaftConfiguration(struct raft_configuration *c) { printf("configuration: \n"); for (int i = 0; i < c->n; ++i) { - printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address); + printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); } } @@ -119,11 +119,9 @@ void printRaftState(struct raft *r) { printf("last_applied: %llu \n", r->last_applied); printf("last_stored: %llu \n", r->last_stored); - /* printf("configuration_index: %llu \n", r->configuration_index); printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); printRaftConfiguration(&r->configuration); - */ printf("----------------------------\n"); } @@ -164,6 +162,18 @@ void getValue(const char *key) { } } +void raft_change_cb_add(struct raft_change *req, int status) { + printf("raft_change_cb_add status:%d ... \n", status); +} + +void raft_change_cb_assign(struct raft_change *req, int status) { + printf("raft_change_cb_assign status:%d ... \n", status); +} + +void raft_change_cb_remove(struct raft_change *req, int status) { + printf("raft_change_cb_remove status:%d ... \n", status); +} + void console(SRaftServer *pRaftServer) { while (1) { char cmd_buf[COMMAND_LEN]; @@ -193,30 +203,59 @@ void console(SRaftServer *pRaftServer) { parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN); if (strcmp(cmd, "addnode") == 0) { - printf("not support \n"); + //printf("not support \n"); - /* char host[HOST_LEN]; uint32_t port; parseAddr(param1, host, HOST_LEN, &port); uint64_t rid = raftId(host, port); struct raft_change *req = raft_malloc(sizeof(*req)); - int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); + int r = raft_add(&pRaftServer->raft, req, rid, param1, raft_change_cb_add); if (r != 0) { - printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); + printf("raft_add error: %s \n", raft_errmsg(&pRaftServer->raft)); } printf("add node: %lu %s \n", rid, param1); struct raft_change *req2 = raft_malloc(sizeof(*req2)); - r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); + r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign); + if (r != 0) { + printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("raft_assign: %s %d \n", param1, RAFT_VOTER); + + } else if (strcmp(cmd, "activate") == 0) { + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + int r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign); if (r != 0) { - printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); + printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft)); } - */ + printf("raft_assign: %s %d \n", param1, RAFT_VOTER); + + + + } else if (strcmp(cmd, "dropnode") == 0) { - printf("not support \n"); + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + struct raft_change *req = raft_malloc(sizeof(*req)); + int r = raft_remove(&pRaftServer->raft, req, rid, raft_change_cb_remove); + if (r != 0) { + printf("raft_remove: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("drop node: %lu %s \n", rid, param1); + + } else if (strcmp(cmd, "put") == 0) { char buf[256]; @@ -234,6 +273,7 @@ void console(SRaftServer *pRaftServer) { } else if (strcmp(cmd, "help") == 0) { printf("addnode \"127.0.0.1:8888\" \n"); + printf("activate \"127.0.0.1:8888\" \n"); printf("dropnode \"127.0.0.1:8888\" \n"); printf("put key value \n"); printf("get key \n"); @@ -256,7 +296,9 @@ void *startConsoleFunc(void *param) { // Config --------------------------------- void usage() { printf("\nusage: \n"); - printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10000 --dir=./data --voter \n", exe_name); + printf("%s --me=127.0.0.1:10001 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10002 --dir=./data \n", exe_name); printf("\n"); printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); @@ -271,13 +313,15 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { option_index = 0; static struct option long_options[] = { {"help", no_argument, NULL, 'h'}, + {"voter", no_argument, NULL, 'v'}, {"peers", required_argument, NULL, 'p'}, {"me", required_argument, NULL, 'm'}, {"dir", required_argument, NULL, 'd'}, {NULL, 0, NULL, 0} }; - while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { + pConf->voter = 0; + while ((option_value = getopt_long(argc, argv, "hvp:m:d:", long_options, &option_index)) != -1) { switch (option_value) { case 'm': { parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); @@ -295,6 +339,10 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { break; } + case 'v': { + pConf->voter = 1; + break; + } case 'd': { snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); @@ -338,6 +386,8 @@ int main(int argc, char **argv) { exit(-1); } + signal(SIGPIPE, SIG_IGN); + SRaftServerConfig conf; parseConf(argc, argv, &conf); printConf(&conf); diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c index 6f4dbc1997ce3e3036592a072347e32c7596245a..ffec22e6463b0cafcfd986d9980e3466a7c9c20d 100644 --- a/contrib/test/craft/raftServer.c +++ b/contrib/test/craft/raftServer.c @@ -85,29 +85,45 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, pRaftServer->fsm = pFsm; ret = uv_loop_init(&pRaftServer->loop); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } struct raft_configuration conf; raft_configuration_init(&conf); - raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + + if (pConf->voter == 0) { + raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_SPARE); + + } else { + raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + + } + + + printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address); + + for (int i = 0; i < pConf->peersCount; ++i) { const Addr *pAddr = &pConf->peers[i]; raft_id rid = raftId(pAddr->host, pAddr->port); diff --git a/contrib/test/traft/CMakeLists.txt b/contrib/test/traft/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e29fea04f187d7fbd220978e0539c8929d7b7a5c --- /dev/null +++ b/contrib/test/traft/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(raftMain "") +target_sources(raftMain + PRIVATE + "raftMain.c" + "raftServer.c" +) +target_link_libraries(raftMain PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/clear.sh b/contrib/test/traft/clear.sh new file mode 100644 index 0000000000000000000000000000000000000000..398b3088f20ae8cce179ff909f206fc162918876 --- /dev/null +++ b/contrib/test/traft/clear.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/traft/common.h b/contrib/test/traft/common.h new file mode 100644 index 0000000000000000000000000000000000000000..0229c29cf75e709d2441ed36c153ee41a29cfacc --- /dev/null +++ b/contrib/test/traft/common.h @@ -0,0 +1,36 @@ +#ifndef TDENGINE_COMMON_H +#define TDENGINE_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +#define MAX_INSTANCE_NUM 100 + +#define MAX_PEERS 10 +#define COMMAND_LEN 1024 +#define TOKEN_LEN 128 +#define DIR_LEN 256 +#define HOST_LEN 64 +#define ADDRESS_LEN (HOST_LEN + 16) + +typedef struct { + char host[HOST_LEN]; + uint32_t port; +} Addr; + +typedef struct { + Addr me; + Addr peers[MAX_PEERS]; + int peersCount; + char dir[DIR_LEN]; + char dataDir[DIR_LEN + HOST_LEN * 2]; +} SRaftServerConfig; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_COMMON_H diff --git a/contrib/test/traft/help.txt b/contrib/test/traft/help.txt new file mode 100644 index 0000000000000000000000000000000000000000..7709e80e0a9ded97a16cc4d3e31e26dfdacc5e4f --- /dev/null +++ b/contrib/test/traft/help.txt @@ -0,0 +1,18 @@ + +make raftServer + +all: + gcc raftMain.c raftServer.c -I ../../traft/include/ ../../traft/.libs/libraft.a -o raftMain -luv -llz4 -lpthread -g +clean: + rm -f raftMain + sh clear.sh + + +make traft: + +sudo apt-get install libuv1-dev liblz4-dev +autoreconf -i +./configure --enable-example +make + + diff --git a/contrib/test/traft/raftMain.c b/contrib/test/traft/raftMain.c new file mode 100644 index 0000000000000000000000000000000000000000..24ad93856c5c21cb74d9799ff87c5f1d2c8481be --- /dev/null +++ b/contrib/test/traft/raftMain.c @@ -0,0 +1,659 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "raftServer.h" +#include "common.h" + +const char *exe_name; + +typedef struct LeaderState { + char address[48]; + int leaderCount; + +} LeaderState; + +#define NODE_COUNT 3 +LeaderState leaderStates[NODE_COUNT]; + +void printLeaderCount() { + for (int i = 0; i < NODE_COUNT; ++i) { + printf("%s: leaderCount:%d \n", leaderStates[i].address, leaderStates[i].leaderCount); + } +} + +void updateLeaderStates(SRaftServer *pRaftServer) { + for (int i = 0; i < pRaftServer->instance[0].raft.configuration.n; ++i) { + snprintf(leaderStates[i].address, sizeof(leaderStates[i].address), "%s", pRaftServer->instance[0].raft.configuration.servers[i].address); + leaderStates[i].leaderCount = 0; + } + + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + struct raft *r = &pRaftServer->instance[i].raft; + + char leaderAddress[128]; + memset(leaderAddress, 0, sizeof(leaderAddress)); + + if (r->state == RAFT_LEADER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address); + } else if (r->state == RAFT_FOLLOWER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address); + } + + for (int j = 0; j < NODE_COUNT; j++) { + if (strcmp(leaderAddress, leaderStates[j].address) == 0) { + leaderStates[j].leaderCount++; + } + } + } +} + + +void raftTransferCb(struct raft_transfer *req) { + SRaftServer *pRaftServer = req->data; + raft_free(req); + + printf("raftTransferCb: \n"); + updateLeaderStates(pRaftServer); + printLeaderCount(); + + int myLeaderCount; + for (int i = 0; i < NODE_COUNT; ++i) { + if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { + myLeaderCount = leaderStates[i].leaderCount; + } + } + + printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); + if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { + struct raft *r; + for (int j = 0; j < pRaftServer->instanceCount; ++j) { + if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { + r = &pRaftServer->instance[j].raft; + } + } + + struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); + transfer->data = pRaftServer; + + uint64_t destRaftId; + int minIndex = -1; + int minLeaderCount = myLeaderCount; + for (int j = 0; j < NODE_COUNT; ++j) { + if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; + if (leaderStates[j].leaderCount <= minLeaderCount) { + minIndex = j; + } + } + + char myHost[48]; + uint16_t myPort; + uint16_t myVid; + decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); + + char *destAddress = leaderStates[minIndex].address; + + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + splitString(destAddress, ":", tokens, 2); + char *destHost = tokens[0]; + uint16_t destPort = atoi(tokens[1]); + destRaftId = encodeRaftId(destHost, destPort, myVid); + + raft_transfer(r, transfer, destRaftId, raftTransferCb); + } + +} + + +void parseAddr(const char *addr, char *host, int len, uint32_t *port) { + char* tmp = (char*)malloc(strlen(addr) + 1); + strcpy(tmp, addr); + + char* context; + char* separator = ":"; + char* token = strtok_r(tmp, separator, &context); + if (token) { + snprintf(host, len, "%s", token); + } + + token = strtok_r(NULL, separator, &context); + if (token) { + sscanf(token, "%u", port); + } + + free(tmp); +} + +// only parse 3 tokens +int parseCommand3(const char* str, char* token1, char* token2, char* token3, int len) +{ + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + + char* context; + char* separator = " "; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token1, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token2, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token3, token, len); + n++; + } + +ret: + return n; + free(tmp); +} + +// only parse 4 tokens +int parseCommand4(const char* str, char* token1, char* token2, char* token3, char *token4, int len) +{ + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + + char* context; + char* separator = " "; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token1, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token2, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token3, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token4, token, len); + n++; + } + +ret: + return n; + free(tmp); +} + +void *startServerFunc(void *param) { + SRaftServer *pServer = (SRaftServer*)param; + int32_t r = raftServerStart(pServer); + assert(r == 0); + + return NULL; +} + +// Console --------------------------------- +const char* state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + + } + return "UNKNOWN_RAFT_STATE"; +} + + +void printRaftState2(struct raft *r) { + + char leaderAddress[128]; + memset(leaderAddress, 0, sizeof(leaderAddress)); + + if (r->state == RAFT_LEADER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address); + } else if (r->state == RAFT_FOLLOWER) { + snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address); + } + + for (int i = 0; i < r->configuration.n; ++i) { + char tmpAddress[128]; + snprintf(tmpAddress, sizeof(tmpAddress), "%s", r->configuration.servers[i].address); + + uint64_t raftId = r->configuration.servers[i].id; + char host[128]; + uint16_t port; + uint16_t vid; + decodeRaftId(raftId, host, 128, &port, &vid); + + char buf[512]; + memset(buf, 0, sizeof(buf)); + if (strcmp(tmpAddress, leaderAddress) == 0) { + snprintf(buf, sizeof(buf), "<%s:%u-%u-LEADER>\t", host, port, vid); + } else { + snprintf(buf, sizeof(buf), "<%s:%u-%u-FOLLOWER>\t", host, port, vid); + } + printf("%s", buf); + } + printf("\n"); +} + +void printRaftConfiguration(struct raft_configuration *c) { + printf("configuration: \n"); + for (int i = 0; i < c->n; ++i) { + printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); + } +} + +void printRaftState(struct raft *r) { + printf("----Raft State: -----------\n"); + printf("mem_addr: %p \n", r); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + + printf("configuration_index: %llu \n", r->configuration_index); + printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); + printRaftConfiguration(&r->configuration); + + printf("----------------------------\n"); +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + raft_free(req); + struct raft *r = req->data; + if (status != 0) { + printf("putValueCb: %s \n", raft_errmsg(r)); + } else { + printf("putValueCb: %s \n", "ok"); + } +} + +void putValue(struct raft *r, const char *value) { + struct raft_buffer buf; + + buf.len = TOKEN_LEN;; + buf.base = raft_malloc(buf.len); + snprintf(buf.base, buf.len, "%s", value); + + struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); + req->data = r; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char*)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +void getValue(const char *key) { + char *ptr = getKV(key); + if (ptr) { + printf("get value: [%s] \n", ptr); + } else { + printf("value not found for key: [%s] \n", key); + } +} + +void console(SRaftServer *pRaftServer) { + while (1) { + char cmd_buf[COMMAND_LEN]; + memset(cmd_buf, 0, sizeof(cmd_buf)); + char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); + if (!ret) { + exit(-1); + } + + int pos = strlen(cmd_buf); + if(cmd_buf[pos - 1] == '\n') { + cmd_buf[pos - 1] = '\0'; + } + + if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) { + continue; + } + + char cmd[TOKEN_LEN]; + memset(cmd, 0, sizeof(cmd)); + + char param1[TOKEN_LEN]; + memset(param1, 0, sizeof(param1)); + + char param2[TOKEN_LEN]; + memset(param2, 0, sizeof(param2)); + + char param3[TOKEN_LEN]; + memset(param2, 0, sizeof(param2)); + + parseCommand4(cmd_buf, cmd, param1, param2, param3, TOKEN_LEN); + if (strcmp(cmd, "addnode") == 0) { + printf("not support \n"); + + /* + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + struct raft_change *req = raft_malloc(sizeof(*req)); + int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); + if (r != 0) { + printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("add node: %lu %s \n", rid, param1); + + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); + if (r != 0) { + printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); + } + */ + + } else if (strcmp(cmd, "dropnode") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "rebalance") == 0) { + + /* + updateLeaderStates(pRaftServer); + + int myLeaderCount; + for (int i = 0; i < NODE_COUNT; ++i) { + if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { + myLeaderCount = leaderStates[i].leaderCount; + } + } + + while (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { + printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); + + struct raft *r; + for (int j = 0; j < pRaftServer->instanceCount; ++j) { + if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { + r = &pRaftServer->instance[j].raft; + } + } + + struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); + transfer->data = pRaftServer; + + uint64_t destRaftId; + int minIndex = -1; + int minLeaderCount = myLeaderCount; + for (int j = 0; j < NODE_COUNT; ++j) { + if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; + + printf("-----leaderStates[%d].leaderCount:%d \n", j, leaderStates[j].leaderCount); + if (leaderStates[j].leaderCount <= minLeaderCount) { + minIndex = j; + printf("++++ assign minIndex : %d \n", minIndex); + } + } + + printf("minIndex:%d minLeaderCount:%d \n", minIndex, minLeaderCount); + + char myHost[48]; + uint16_t myPort; + uint16_t myVid; + decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); + + char *destAddress = leaderStates[minIndex].address; + + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + splitString(destAddress, ":", tokens, 2); + char *destHost = tokens[0]; + uint16_t destPort = atoi(tokens[1]); + destRaftId = encodeRaftId(destHost, destPort, myVid); + + printf("destHost:%s destPort:%u myVid:%u", destHost, destPort, myVid); + raft_transfer(r, transfer, destRaftId, raftTransferCb); + sleep(1); + + for (int i = 0; i < NODE_COUNT; ++i) { + if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) { + myLeaderCount = leaderStates[i].leaderCount; + } + } + } + */ + + + int leaderCount = 0; + + struct raft *firstR; + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + struct raft *r = &pRaftServer->instance[i].raft; + if (r->state == RAFT_LEADER) { + leaderCount++; + firstR = r; + } + } + + if (leaderCount > pRaftServer->instanceCount / NODE_COUNT) { + struct raft_transfer *transfer = raft_malloc(sizeof(*transfer)); + transfer->data = pRaftServer; + raft_transfer(firstR, transfer, 0, raftTransferCb); + } + + + } else if (strcmp(cmd, "put") == 0) { + char buf[256]; + uint16_t vid; + sscanf(param1, "%hu", &vid); + snprintf(buf, sizeof(buf), "%s--%s", param2, param3); + putValue(&pRaftServer->instance[vid].raft, buf); + + } else if (strcmp(cmd, "get") == 0) { + getValue(param1); + + } else if (strcmp(cmd, "transfer") == 0) { + uint16_t vid; + sscanf(param1, "%hu", &vid); + + struct raft_transfer transfer; + raft_transfer(&pRaftServer->instance[vid].raft, &transfer, 0, NULL); + + + } else if (strcmp(cmd, "state") == 0) { + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + printf("instance %d: ", i); + printRaftState(&pRaftServer->instance[i].raft); + } + + } else if (strcmp(cmd, "state2") == 0) { + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + printRaftState2(&pRaftServer->instance[i].raft); + } + + } else if (strcmp(cmd, "snapshot") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "help") == 0) { + printf("addnode \"127.0.0.1:8888\" \n"); + printf("dropnode \"127.0.0.1:8888\" \n"); + printf("put key value \n"); + printf("get key \n"); + printf("state \n"); + + } else { + printf("unknown command: [%s], type \"help\" to see help \n", cmd); + } + + //printf("cmd_buf: [%s] \n", cmd_buf); + } +} + +void *startConsoleFunc(void *param) { + SRaftServer *pServer = (SRaftServer*)param; + console(pServer); + return NULL; +} + +// Config --------------------------------- +void usage() { + printf("\nusage: \n"); + printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("\n"); + printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name); + printf("\n"); +} + +void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { + memset(pConf, 0, sizeof(*pConf)); + + int option_index, option_value; + option_index = 0; + static struct option long_options[] = { + {"help", no_argument, NULL, 'h'}, + {"peers", required_argument, NULL, 'p'}, + {"me", required_argument, NULL, 'm'}, + {"dir", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0} + }; + + while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { + switch (option_value) { + case 'm': { + parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); + break; + } + + case 'p': { + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + int peerCount = splitString(optarg, ",", tokens, MAX_PEERS); + pConf->peersCount = peerCount; + for (int i = 0; i < peerCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port); + } + break; + } + + + case 'd': { + snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); + break; + } + + case 'h': { + usage(); + exit(-1); + } + + default: { + usage(); + exit(-1); + } + } + } + snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s_%u", pConf->dir, pConf->me.host, pConf->me.port); +} + +void printConf(SRaftServerConfig *pConf) { + printf("\nconf: \n"); + printf("me: %s:%u \n", pConf->me.host, pConf->me.port); + printf("peersCount: %d \n", pConf->peersCount); + for (int i = 0; i < pConf->peersCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port); + } + printf("dataDir: %s \n\n", pConf->dataDir); + +} + + +int main(int argc, char **argv) { + srand(time(NULL)); + int32_t ret; + + exe_name = argv[0]; + if (argc < 3) { + usage(); + exit(-1); + } + + SRaftServerConfig conf; + parseConf(argc, argv, &conf); + printConf(&conf); + + signal(SIGPIPE, SIG_IGN); + + /* + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir); + system(cmd_buf); + */ + + + struct raft_fsm fsm; + initFsm(&fsm); + + SRaftServer raftServer; + ret = raftServerInit(&raftServer, &conf, &fsm); + assert(ret == 0); + + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer); + + pthread_t tidConsole; + pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer); + + while (1) { + sleep(10); + } + + return 0; +} diff --git a/contrib/test/traft/raftServer.c b/contrib/test/traft/raftServer.c new file mode 100644 index 0000000000000000000000000000000000000000..94de49cd0fd141e0afb35db2bd1946272b1d5c2f --- /dev/null +++ b/contrib/test/traft/raftServer.c @@ -0,0 +1,222 @@ +#include +#include +#include "common.h" +#include "raftServer.h" + +char *keys; +char *values; + +void initStore() { + keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + writeIndex = 0; +} + +void destroyStore() { + free(keys); + free(values); +} + +void putKV(const char *key, const char *value) { + if (writeIndex < MAX_RECORD_COUNT) { + strncpy(&keys[writeIndex], key, MAX_KV_LEN); + strncpy(&values[writeIndex], value, MAX_KV_LEN); + writeIndex++; + } +} + +char *getKV(const char *key) { + for (int i = 0; i < MAX_RECORD_COUNT; ++i) { + if (strcmp(&keys[i], key) == 0) { + return &values[i]; + } + } + return NULL; +} + + +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr) +{ + if (n_arr <= 0) { + return -1; + } + + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + char* context; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + + while (1) { + token = strtok_r(NULL, separator, &context); + if (!token || n >= n_arr) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + } + +ret: + free(tmp); + return n; +} + +/* +uint64_t raftId(const char *host, uint32_t port) { + uint32_t host_uint32 = (uint32_t)inet_addr(host); + assert(host_uint32 != (uint32_t)-1); + uint64_t code = ((uint64_t)host_uint32) << 32 | port; + return code; +} +*/ + + +/* +uint64_t encodeRaftId(const char *host, uint16_t port, uint16_t vid) { + uint64_t raftId; + uint32_t host_uint32 = (uint32_t)inet_addr(host); + assert(host_uint32 != (uint32_t)-1); + + raftId = (((uint64_t)host_uint32) << 32) | (((uint32_t)port) << 16) | vid; + return raftId; +} + +void decodeRaftId(uint64_t raftId, char *host, int32_t len, uint16_t *port, uint16_t *vid) { + uint32_t host32 = (uint32_t)((raftId >> 32) & 0x00000000FFFFFFFF); + + struct in_addr addr; + addr.s_addr = host32; + snprintf(host, len, "%s", inet_ntoa(addr)); + + *port = (uint16_t)((raftId & 0x00000000FFFF0000) >> 16); + *vid = (uint16_t)(raftId & 0x000000000000FFFF); +} +*/ + + + + +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) { + int ret; + + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); + pRaftServer->port = pConf->me.port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); + //strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir)); + + ret = uv_loop_init(&pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret)); + assert(0); + } + + ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret); + assert(0); + } + + + uint16_t vid; + pRaftServer->instanceCount = 20; + + + for (int i = 0; i < pRaftServer->instanceCount; ++i) + { + //vid = 0; + vid = i; + + + pRaftServer->instance[vid].raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); + snprintf(pRaftServer->instance[vid].dir, sizeof(pRaftServer->instance[vid].dir), "%s_%llu", pConf->dataDir, pRaftServer->instance[vid].raftId); + + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", pRaftServer->instance[vid].dir); + system(cmd_buf); + sleep(1); + + pRaftServer->instance[vid].fsm = pFsm; + + ret = raft_uv_init(&pRaftServer->instance[vid].io, &pRaftServer->loop, pRaftServer->instance[vid].dir, &pRaftServer->transport); + if (ret != 0) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft)); + assert(0); + } + + ret = raft_init(&pRaftServer->instance[vid].raft, &pRaftServer->instance[vid].io, pRaftServer->instance[vid].fsm, pRaftServer->instance[vid].raftId, pRaftServer->address); + if (ret != 0) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft)); + assert(0); + } + + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftServer->instance[vid].raftId, pRaftServer->address, RAFT_VOTER); + printf("add myself: %llu - %s \n", pRaftServer->instance[vid].raftId, pRaftServer->address); + for (int i = 0; i < pConf->peersCount; ++i) { + const Addr *pAddr = &pConf->peers[i]; + + raft_id rid = encodeRaftId(pAddr->host, pAddr->port, vid); + + char addrBuf[ADDRESS_LEN]; + snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port); + raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER); + printf("add peers: %llu - %s \n", rid, addrBuf); + } + + raft_bootstrap(&pRaftServer->instance[vid].raft, &conf); + + } + + + + + + + + return 0; +} + +int32_t raftServerStart(SRaftServer *pRaftServer) { + int ret; + + for (int i = 0; i < pRaftServer->instanceCount; ++i) { + ret = raft_start(&pRaftServer->instance[i].raft); + if (ret != 0) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[i].raft)); + } + + } + + + uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); +} + + +void raftServerClose(SRaftServer *pRaftServer) { + +} + + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { + char *msg = (char*)buf->base; + printf("fsm apply: %s \n", msg); + + char arr[2][MAX_TOKEN_LEN]; + splitString(msg, "--", arr, 2); + putKV(arr[0], arr[1]); + + return 0; +} + +int32_t initFsm(struct raft_fsm *fsm) { + initStore(); + fsm->apply = fsmApplyCb; + return 0; +} diff --git a/contrib/test/traft/raftServer.h b/contrib/test/traft/raftServer.h new file mode 100644 index 0000000000000000000000000000000000000000..b1f62caac590a2aa3f7d671378a0dcea00189b62 --- /dev/null +++ b/contrib/test/traft/raftServer.h @@ -0,0 +1,68 @@ +#ifndef TDENGINE_RAFT_SERVER_H +#define TDENGINE_RAFT_SERVER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include "raft.h" +#include "raft/uv.h" +#include "common.h" + + +// simulate a db store, just for test +#define MAX_KV_LEN 100 +#define MAX_RECORD_COUNT 500 +char *keys; +char *values; +int writeIndex; + +void initStore(); +void destroyStore(); +void putKV(const char *key, const char *value); +char *getKV(const char *key); + +typedef struct { + char dir[DIR_LEN + HOST_LEN * 4]; /* Data dir of UV I/O backend */ + raft_id raftId; /* For vote */ + struct raft_fsm *fsm; /* Sample application FSM */ + struct raft raft; /* Raft instance */ + struct raft_io io; /* UV I/O backend */ + +} SInstance; + +typedef struct { + char host[HOST_LEN]; + uint32_t port; + char address[ADDRESS_LEN]; /* Raft instance address */ + + struct uv_loop_s loop; /* UV loop */ + struct raft_uv_transport transport; /* UV I/O backend transport */ + + SInstance instance[MAX_INSTANCE_NUM]; + int32_t instanceCount; + +} SRaftServer; + +#define MAX_TOKEN_LEN 32 +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr); + +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm); +int32_t raftServerStart(SRaftServer *pRaftServer); +void raftServerClose(SRaftServer *pRaftServer); + + +int initFsm(struct raft_fsm *fsm); + + + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SERVER_H diff --git a/include/util/tfile.h b/include/util/tfile.h index af4c19e7d19ebcd6fb9d24f435bb51073cd9836b..b3d141c4434154bf0ac6887aae1dc0e0a12b8efa 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -38,6 +38,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname); int64_t tfClose(int64_t tfd); int64_t tfWrite(int64_t tfd, void *buf, int64_t count); int64_t tfRead(int64_t tfd, void *buf, int64_t count); +int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset); int32_t tfFsync(int64_t tfd); bool tfValid(int64_t tfd); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); @@ -47,4 +48,4 @@ int32_t tfFtruncate(int64_t tfd, int64_t length); } #endif -#endif /*_TD_UTIL_FILE_H*/ +#endif /*_TD_UTIL_FILE_H*/ diff --git a/include/util/tqueue.h b/include/util/tqueue.h index bcb9aea856a104ce71b8355d607f6437d768a3a2..a57bdb5ce8270d65e7280412e90961360036239c 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -22,59 +22,57 @@ extern "C" { /* -This set of API for queue is designed specially for vnode/mnode. The main purpose is to -consume all the items instead of one item from a queue by one single read. Also, it can -combine multiple queues into a queue set, a consumer thread can consume a queue set via +This set of API for queue is designed specially for vnode/mnode. The main purpose is to +consume all the items instead of one item from a queue by one single read. Also, it can +combine multiple queues into a queue set, a consumer thread can consume a queue set via a single API instead of looping every queue by itself. Notes: -1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe +1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe 2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe. 3: read/write operation APIs are multi-thread safe To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c) -shall be used to set up the protection. +shall be used to set up the protection. */ -typedef void *taos_queue; -typedef void *taos_qset; -typedef void *taos_qall; +typedef struct STaosQueue STaosQueue; +typedef struct STaosQset STaosQset; +typedef struct STaosQall STaosQall; typedef void (*FProcessItem)(void *ahandle, void *pItem); -typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); - -taos_queue taosOpenQueue(); -void taosCloseQueue(taos_queue); -void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems); -void *taosAllocateQitem(int size); -void taosFreeQitem(void *pItem); -int taosWriteQitem(taos_queue, void *pItem); -int taosReadQitem(taos_queue, void **pItem); -bool taosQueueEmpty(taos_queue); - -taos_qall taosAllocateQall(); -void taosFreeQall(taos_qall); -int taosReadAllQitems(taos_queue, taos_qall); -int taosGetQitem(taos_qall, void **pItem); -void taosResetQitems(taos_qall); - -taos_qset taosOpenQset(); -void taosCloseQset(); -void taosQsetThreadResume(taos_qset param); -int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); -void taosRemoveFromQset(taos_qset, taos_queue); -int taosGetQueueNumber(taos_qset); - -int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *); -int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *); - -int taosGetQueueItemsNumber(taos_queue param); -int taosGetQsetItemsNumber(taos_qset param); +typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); + +STaosQueue *taosOpenQueue(); +void taosCloseQueue(STaosQueue *queue); +void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); +void *taosAllocateQitem(int32_t size); +void taosFreeQitem(void *pItem); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem); +int32_t taosReadQitem(STaosQueue *queue, void **ppItem); +bool taosQueueEmpty(STaosQueue *queue); + +STaosQall *taosAllocateQall(); +void taosFreeQall(STaosQall *qall); +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); +int32_t taosGetQitem(STaosQall *qall, void **ppItem); +void taosResetQitems(STaosQall *qall); + +STaosQset *taosOpenQset(); +void taosCloseQset(STaosQset *qset); +void taosQsetThreadResume(STaosQset *qset); +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); +int32_t taosGetQueueNumber(STaosQset *qset); + +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); + +int32_t taosGetQueueItemsNumber(STaosQueue *queue); +int32_t taosGetQsetItemsNumber(STaosQset *qset); #ifdef __cplusplus } #endif #endif /*_TD_UTIL_QUEUE_H*/ - - diff --git a/include/util/tworker.h b/include/util/tworker.h index 2e5852cbbac6b305b55bc8ec1cf04b34f254663b..27f03bd2b69e6d6add97cdc2e5ab05ad2fdd5eca 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -35,7 +35,7 @@ typedef struct SWorkerPool { int32_t max; // max number of workers int32_t min; // min number of workers int32_t num; // current number of workers - taos_qset qset; + STaosQset *qset; const char *name; SWorker *workers; pthread_mutex_t mutex; @@ -44,8 +44,8 @@ typedef struct SWorkerPool { typedef struct SMWorker { int32_t id; // worker id pthread_t thread; // thread - taos_qall qall; - taos_qset qset; // queue set + STaosQall *qall; + STaosQset *qset; // queue set SMWorkerPool *pool; } SMWorker; @@ -57,15 +57,15 @@ typedef struct SMWorkerPool { pthread_mutex_t mutex; } SMWorkerPool; -int32_t tWorkerInit(SWorkerPool *pool); -void tWorkerCleanup(SWorkerPool *pool); -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); -void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); +int32_t tWorkerInit(SWorkerPool *pool); +void tWorkerCleanup(SWorkerPool *pool); +STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); +void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue); -int32_t tMWorkerInit(SMWorkerPool *pool); -void tMWorkerCleanup(SMWorkerPool *pool); -taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); -void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue); +int32_t tMWorkerInit(SMWorkerPool *pool); +void tMWorkerCleanup(SMWorkerPool *pool); +STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); +void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 8161b8d125ebb9f9fec4762f0327e2adc3a5449f..cedab6266e65734fc0f18790788bd0b2bb4b9bf6 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -140,7 +140,7 @@ void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = 30000000; //3.0.0.0 pOption->numOfCores = tsNumOfCores; pOption->numOfSupportVnodes = tsNumOfSupportVnodes; - pOption->numOfCommitThreads = 1; + pOption->numOfCommitThreads = tsNumOfCommitThreads; pOption->statusInterval = tsStatusInterval; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->ratioOfQueryCores = tsRatioOfQueryCores; diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 954e21aefa98ba9f8a14aae9ca227e7400fb8ee5..07c8ce5d02db09a19db429801b637e4e7dc73e6d 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -64,7 +64,7 @@ typedef struct { int32_t maxNum; void *queueFp; SDnode *pDnode; - taos_queue queue; + STaosQueue *queue; union { SWorkerPool pool; SMWorkerPool mpool; @@ -92,7 +92,7 @@ typedef struct { SDnodeEps *dnodeEps; pthread_t *threadId; SRWLatch latch; - taos_queue pMgmtQ; + STaosQueue *pMgmtQ; SWorkerPool mgmtPool; } SDnodeMgmt; diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index 66b619318d0690e3640668a4db6013b1b8696c9d..c12d449517adcf5fbeafe1a5f727177000ab10b3 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -19,7 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs); +static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SBnode *dndAcquireBnode(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; @@ -286,7 +286,7 @@ static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { taosFreeQitem(pMsg); } -static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) { +static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { for (int32_t i = 0; i < numOfMsgs; ++i) { SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); @@ -294,7 +294,7 @@ static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t cod } } -static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SBnode *pBnode = dndAcquireBnode(pDnode); if (pBnode == NULL) { dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 3eab3e5aec734dc1993bcd37b745529204b1e562..8835e0ba65cbb4a2f8a8721391a606e6e47bdac1 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -27,20 +27,20 @@ typedef struct { } SWrapperCfg; typedef struct { - int32_t vgId; - int32_t refCount; - int32_t vgVersion; - int8_t dropped; - int8_t accessState; - uint64_t dbUid; - char *db; - char *path; - SVnode *pImpl; - taos_queue pWriteQ; - taos_queue pSyncQ; - taos_queue pApplyQ; - taos_queue pQueryQ; - taos_queue pFetchQ; + int32_t vgId; + int32_t refCount; + int32_t vgVersion; + int8_t dropped; + int8_t accessState; + uint64_t dbUid; + char *db; + char *path; + SVnode *pImpl; + STaosQueue *pWriteQ; + STaosQueue *pSyncQ; + STaosQueue *pApplyQ; + STaosQueue *pQueryQ; + STaosQueue* pFetchQ; } SVnodeObj; typedef struct { @@ -72,9 +72,9 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); +static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); +static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); +static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -768,7 +768,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); } -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -804,7 +804,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t taosArrayDestroy(pArray); } -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -815,7 +815,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t } } -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -826,7 +826,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { int32_t code = 0; if (pQueue == NULL) { diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index aafbb1b4e7dd1429e8111d8522cf1589cd6aa4cd..af8af6a05268fc1532526cdf03509a5d2e8c97a6 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -81,7 +81,7 @@ int metaOpenDB(SMeta *pMeta) { return -1; } - if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db", false) < 0) { + if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "schema.db", false) < 0) { metaCloseDB(pMeta); return -1; } @@ -558,6 +558,12 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0); +#if 0 + DB_BTREE_STAT *sp; + pDB->pTbDB->stat(pDB->pTbDB, NULL, &sp, 0); + printf("**************** %ld\n", sp->bt_nkeys); +#endif + return pTbCur; } diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 73c79b2619512a3f769911f86d2ae65fe45af8b6..072ec93e4e648ae55c945f291e0c0d73dc51cbc0 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -142,7 +142,8 @@ uint64_t fstStateInputLen(FstState* state); // end_addr uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data); uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes); -uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans); +uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, + uint64_t nTrans); // input uint8_t fstStateInput(FstState* state, FstNode* node); uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i); @@ -255,9 +256,10 @@ typedef struct FstMeta { } FstMeta; typedef struct Fst { - FstMeta* meta; - FstSlice* data; // - FstNode* root; // + FstMeta* meta; + FstSlice* data; // + FstNode* root; // + pthread_mutex_t mtx; } Fst; // refactor simple function @@ -310,7 +312,8 @@ StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* sta void swsResultDestroy(StreamWithStateResult* result); typedef void* (*StreamCallback)(void*); -StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max); +StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, + FstBoundWithData* max); void streamWithStateDestroy(StreamWithState* sws); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index d8e69b5fdece6eeae9d0ce4809fb2d04f2b91e4c..4928e01a6322ae487c00580d935ba37d9f0c00fd 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -77,6 +77,7 @@ typedef struct TFileReader { Fst* fst; WriterCtx* ctx; TFileHeader header; + bool remove; } TFileReader; typedef struct IndexTFile { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 6d64bdbd65858fe42fba06f090d69ad357e1ab36..5167196031eb822b5540fa3c7b224e64ac859b0a 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -94,7 +94,6 @@ void indexClose(SIndex* sIdx) { #endif #ifdef USE_INVERTED_INDEX - indexCacheDestroy(sIdx->cache); void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; @@ -104,6 +103,7 @@ void indexClose(SIndex* sIdx) { taosHashCleanup(sIdx->colObj); pthread_mutex_destroy(&sIdx->mtx); #endif + free(sIdx->path); free(sIdx); return; } diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 882091afb9a64f271b0bad4ada809f13d285a82b..0e46445a00db0a2bcc4c40b03b4b6ea95fa64e52 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,7 +20,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later -#define MEM_TERM_LIMIT 200 +#define MEM_TERM_LIMIT 10000 * 10 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + @@ -110,7 +110,10 @@ void indexCacheDestroySkiplist(SSkipList* slt) { while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - if (ct != NULL) {} + if (ct != NULL) { + free(ct->colVal); + free(ct); + } } tSkipListDestroyIter(iter); tSkipListDestroy(slt); @@ -271,7 +274,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; - indexCacheDebug(pCache); + // indexCacheDebug(pCache); int ret = indexQueryMem(mem, &ct, qtype, result, s); if (ret == 0 && *s != kTypeDeletion) { diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 18024fa39110c3a6acb3052e76e84c332ef425bf..04a08dafd2641f7fd1f91d660d8c4ff4013abeb2 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -354,7 +354,8 @@ uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size - FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes); } -uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans) { +uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, + uint64_t nTrans) { uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes); uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes; return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size @@ -403,8 +404,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i FstSlice* slice = &node->data; uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); - uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - node->nTrans - - (i * tSizes) - tSizes; + uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - + node->nTrans - (i * tSizes) - tSizes; uint8_t* data = fstSliceData(slice, NULL); return unpackDelta(data + at, tSizes, node->end); } @@ -595,7 +596,8 @@ FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) { n->isFinal = fstStateIsFinalState(&st); // s.is_final_state(); n->nTrans = nTrans; n->sizes = sz; - n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); + n->finalOutput = + fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); } return n; } @@ -875,9 +877,7 @@ void* fstBuilderInsertInner(FstBuilder* b) { // b->wrt = NULL; return b->wrt; } -void fstBuilderFinish(FstBuilder* b) { - fstBuilderInsertInner(b); -} +void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); } FstSlice fstNodeAsSlice(FstNode* node) { FstSlice* slice = &node->data; @@ -894,9 +894,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) { return trn; } -void fstLastTransitionDestroy(FstLastTransition* trn) { - free(trn); -} +void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); } void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) { FstLastTransition* trn = unNode->last; if (trn == NULL) { return; } @@ -959,9 +957,10 @@ Fst* fstCreate(FstSlice* slice) { fst->meta->checkSum = checkSum; FstSlice* s = calloc(1, sizeof(FstSlice)); - *s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice)); + *s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1); fst->data = s; + pthread_mutex_init(&fst->mtx, NULL); return fst; FST_CREAT_FAILED: @@ -973,14 +972,18 @@ void fstDestroy(Fst* fst) { free(fst->meta); fstSliceDestroy(fst->data); free(fst->data); + pthread_mutex_destroy(&fst->mtx); } free(fst); } bool fstGet(Fst* fst, FstSlice* b, Output* out) { + // dec lock range + pthread_mutex_lock(&fst->mtx); FstNode* root = fstGetRoot(fst); Output tOut = 0; int32_t len; + uint8_t* data = fstSliceData(b, &len); SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*)); @@ -988,7 +991,10 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { for (uint32_t i = 0; i < len; i++) { uint8_t inp = data[i]; Output res = 0; - if (false == fstNodeFindInput(root, inp, &res)) { return false; } + if (false == fstNodeFindInput(root, inp, &res)) { + pthread_mutex_unlock(&fst->mtx); + return false; + } FstTransition trn; fstNodeGetTransitionAt(root, res, &trn); @@ -997,6 +1003,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { taosArrayPush(nodes, &root); } if (!FST_NODE_IS_FINAL(root)) { + pthread_mutex_unlock(&fst->mtx); return false; } else { tOut = tOut + FST_NODE_FINAL_OUTPUT(root); @@ -1007,13 +1014,13 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { fstNodeDestroy(*node); } taosArrayDestroy(nodes); - fst->root = NULL; + pthread_mutex_unlock(&fst->mtx); *out = tOut; - return true; } FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) { + // refactor later return fstStreamBuilderCreate(fst, ctx); } StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { @@ -1021,24 +1028,30 @@ StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); } FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) { + // refactor later return fstStreamBuilderCreate(fst, ctx); } FstNode* fstGetRoot(Fst* fst) { - if (fst->root != NULL) { return fst->root; } CompiledAddr rAddr = fstGetRootAddr(fst); - fst->root = fstGetNode(fst, rAddr); - return fst->root; + return fstGetNode(fst, rAddr); + // pthread_mutex_lock(&fst->mtx); + // if (fst->root != NULL) { + // // pthread_mutex_unlock(&fst->mtx); + // return fst->root; + //} + // CompiledAddr rAddr = fstGetRootAddr(fst); + // fst->root = fstGetNode(fst, rAddr); + //// pthread_mutex_unlock(&fst->mtx); + // return fst->root; } + FstNode* fstGetNode(Fst* fst, CompiledAddr addr) { + // refactor later return fstNodeCreate(fst->meta->version, addr, fst->data); } -FstType fstGetType(Fst* fst) { - return fst->meta->ty; -} -CompiledAddr fstGetRootAddr(Fst* fst) { - return fst->meta->rootAddr; -} +FstType fstGetType(Fst* fst) { return fst->meta->ty; } +CompiledAddr fstGetRootAddr(Fst* fst) { return fst->meta->rootAddr; } Output fstEmptyFinalOutput(Fst* fst, bool* null) { Output res = 0; @@ -1053,8 +1066,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) { } bool fstVerify(Fst* fst) { - uint32_t checkSum = fst->meta->checkSum; - int32_t len; + uint32_t len, checkSum = fst->meta->checkSum; uint8_t* data = fstSliceData(fst->data, &len); TSCKSUM initSum = 0; if (!taosCheckChecksumWhole(data, len)) { return false; } @@ -1094,15 +1106,12 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) { } } -bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { - return bound->type == Excluded ? false : true; -} +bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { return bound->type == Excluded ? false : true; } -void fstBoundDestroy(FstBoundWithData* bound) { - free(bound); -} +void fstBoundDestroy(FstBoundWithData* bound) { free(bound); } -StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) { +StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, + FstBoundWithData* max) { StreamWithState* sws = calloc(1, sizeof(StreamWithState)); if (sws == NULL) { return NULL; } @@ -1131,7 +1140,9 @@ void streamWithStateDestroy(StreamWithState* sws) { bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { AutomationCtx* aut = sws->aut; if (fstBoundWithDataIsEmpty(min)) { - if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); } + if (fstBoundWithDataIsIncluded(min)) { + sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); + } StreamState s = {.node = fstGetRoot(sws->fst), .trans = 0, .out = {.null = false, .out = 0}, @@ -1203,7 +1214,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { uint64_t trans = s->trans; FstTransition trn; fstNodeGetTransitionAt(n, trans - 1, &trn); - StreamState s = {.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; + StreamState s = { + .node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; taosArrayPush(sws->stack, &s); return true; } @@ -1260,9 +1272,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb size_t isz = taosArrayGetSize(sws->inp); uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t)); - for (uint32_t i = 0; i < isz; i++) { - buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); - } + for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); } FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { taosArrayDestroyEx(sws->stack, streamStateDestroy); @@ -1327,8 +1337,8 @@ FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) { } void fstStreamBuilderDestroy(FstStreamBuilder* b) { fstSliceDestroy(&b->min->data); - tfree(b->min); fstSliceDestroy(&b->max->data); + tfree(b->min); tfree(b->max); free(b); } diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index 590ff294bf8f1841fc27e4519df1d23f668adcd5..c6e3cee3e3b375fe283e7333e726a224b011c91c 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -17,9 +17,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); - if (nsv == NULL) { - return NULL; - } + if (nsv == NULL) { return NULL; } nsv->kind = kind; nsv->type = ty; @@ -37,9 +35,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp } void startWithStateValueDestroy(void* val) { StartWithStateValue* sv = (StartWithStateValue*)val; - if (sv == NULL) { - return; - } + if (sv == NULL) { return; } if (sv->type == FST_INT) { // @@ -52,9 +48,7 @@ void startWithStateValueDestroy(void* val) { } StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); - if (nsv == NULL) { - return NULL; - } + if (nsv == NULL) { return NULL; } nsv->kind = sv->kind; nsv->type = sv->type; @@ -94,14 +88,10 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) { static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; } static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { StartWithStateValue* ssv = (StartWithStateValue*)state; - if (ssv == NULL || ctx == NULL) { - return NULL; - } + if (ssv == NULL || ctx == NULL) { return NULL; } char* data = ctx->data; - if (ssv->kind == Done) { - return startWithStateValueCreate(Done, FST_INT, &ssv->val); - } + if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); } if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { int val = ssv->val + 1; @@ -138,9 +128,7 @@ AutomationFunc automFuncs[] = { AutomationCtx* automCtxCreate(void* data, AutomationType atype) { AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx)); - if (ctx == NULL) { - return NULL; - } + if (ctx == NULL) { return NULL; } StartWithStateValue* sv = NULL; if (atype == AUTOMATION_ALWAYS) { diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 710db563d9e39ddea08832c03db6b0628afc2cc2..7906dfea111ae3bdab56a774abab0226305dd530 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -42,8 +42,8 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) { int nRead = 0; if (ctx->type == TFile) { - tfLseek(ctx->file.fd, offset, 0); - nRead = tfRead(ctx->file.fd, buf, len); + // tfLseek(ctx->file.fd, offset, 0); + nRead = tfPread(ctx->file.fd, buf, len, offset); } else { // refactor later assert(0); @@ -52,6 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off } static int writeCtxDoFlush(WriterCtx* ctx) { if (ctx->type == TFile) { + // taosFsyncFile(ctx->file.fd); tfFsync(ctx->file.fd); // tfFlush(ctx->file.fd); } else { @@ -69,13 +70,15 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int // ugly code, refactor later ctx->file.readOnly = readOnly; if (readOnly == false) { + // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenCreateWriteAppend(path); } else { - ctx->file.fd = tfOpenReadWrite(path); + // ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); + ctx->file.fd = tfOpenRead(path); } memcpy(ctx->file.buf, path, strlen(path)); if (ctx->file.fd < 0) { - indexError("open file error %d", errno); + indexError("failed to open file, error %d", errno); goto END; } } else if (ctx->type == TMemory) { @@ -101,10 +104,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { free(ctx->mem.buf); } else { tfClose(ctx->file.fd); - if (remove) { - indexError("rm file %s", ctx->file.buf); - unlink(ctx->file.buf); - } + if (remove) { unlink(ctx->file.buf); } } free(ctx); } @@ -144,7 +144,8 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; } -int fstCountingWriterFlush(FstCountingWriter* write) { + +int fstCountingWriterFlush(FstCountingWriter* write) { WriterCtx* ctx = write->wrt; ctx->flush(ctx); // write->wtr->flush diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 17322e301e3b2227d610b7af45819d617b063a25..fc4f8593a105f5898fb977a8de0110bf83e5d6e5 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -53,13 +53,6 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); -static TFileReader* tfileReaderCreateImpl(WriterCtx* ctx) { - TFileReader* reader = tfileReaderCreate(ctx); - tfileReaderRef(reader); - // tfileSerialCacheKey(&key, buf); - return reader; -} - TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); if (tcache == NULL) { return NULL; } @@ -88,13 +81,16 @@ TFileCache* tfileCacheCreate(const char* path) { } char buf[128] = {0}; - TFileReader* reader = tfileReaderCreateImpl(wc); + TFileReader* reader = tfileReaderCreate(wc); TFileHeader* header = &reader->header; TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; tfileSerialCacheKey(&key, buf); + + tfileReaderRef(reader); + // indexTable taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); } taosArrayDestroyEx(files, tfileDestroyFileName); @@ -139,6 +135,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) if (p != NULL) { TFileReader* oldReader = *p; taosHashRemove(tcache->tableCache, buf, strlen(buf)); + oldReader->remove = true; tfileReaderUnRef(oldReader); } @@ -152,7 +149,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { // T_REF_INC(reader); reader->ctx = ctx; - if (0 != tfileReaderLoadHeader(reader)) { tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, @@ -172,7 +168,7 @@ void tfileReaderDestroy(TFileReader* reader) { if (reader == NULL) { return; } // T_REF_INC(reader); fstDestroy(reader->fst); - writerCtxDestroy(reader->ctx, true); + writerCtxDestroy(reader->ctx, reader->remove); free(reader); } @@ -232,7 +228,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); if (wc == NULL) { return NULL; } - TFileReader* reader = tfileReaderCreateImpl(wc); + TFileReader* reader = tfileReaderCreate(wc); return reader; // tfileSerialCacheKey(&key, buf); @@ -330,13 +326,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { return -1; } // write fst + indexError("--------Begin----------------"); for (size_t i = 0; i < sz; i++) { // TODO, fst batch write later TFileValue* v = taosArrayGetP((SArray*)data, i); if (tfileWriteData(tw, v) == 0) { // } + indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); } + indexError("--------End----------------"); fstBuilderFinish(tw->fb); fstBuilderDestroy(tw->fb); tw->fb = NULL; @@ -360,7 +359,10 @@ IndexTFile* indexTFileCreate(const char* path) { tfile->cache = tfileCacheCreate(path); return tfile; } -void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); } +void IndexTFileDestroy(IndexTFile* tfile) { + tfileCacheDestroy(tfile->cache); + free(tfile); +} int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int ret = -1; @@ -539,8 +541,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) { char buf[TFILE_HEADER_SIZE] = {0}; int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); - assert(nread == sizeof(buf)); + if (nread == -1) { + // + indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), + errno, reader->ctx->file.fd, reader->ctx->file.buf); + } + // assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); + return 0; } static int tfileReaderLoadFst(TFileReader* reader) { @@ -573,7 +581,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* char* buf = calloc(1, total); if (buf == NULL) { return -1; } - nread = ctx->read(ctx, buf, total); + nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid)); assert(total == nread); for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); } diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index a4d8bb36f11c86b155c599c94bb8ce79c9466c5e..da974ce6c4be500370eefe4b112f507b5e7912e6 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -1,6 +1,7 @@ #include #include +#include #include #include "index.h" #include "indexInt.h" @@ -42,7 +43,8 @@ class FstWriter { class FstReadMemory { public: - FstReadMemory(size_t size) { + FstReadMemory(size_t size, const std::string& fileName = fileName) { + tfInit(); _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); _w = fstCountingWriterCreate(_wc); _size = size; @@ -101,6 +103,7 @@ class FstReadMemory { fstDestroy(_fst); fstSliceDestroy(&_s); writerCtxDestroy(_wc, false); + tfCleanup(); } private: @@ -165,8 +168,44 @@ void checkFstCheckIterator() { delete m; tfCleanup(); } -int main() { - checkFstCheckIterator(); + +void fst_get(Fst* fst) { + for (int i = 0; i < 10000; i++) { + std::string term = "Hello"; + FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size()); + uint64_t offset = 0; + bool ret = fstGet(fst, &key, &offset); + if (ret == false) { + std::cout << "not found" << std::endl; + } else { + std::cout << "found value:" << offset << std::endl; + } + } +} + +#define NUM_OF_THREAD 10 +void validateTFile(char* arg) { + tfInit(); + + std::thread threads[NUM_OF_THREAD]; + // std::vector threads; + TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1"); + + for (int i = 0; i < NUM_OF_THREAD; i++) { + threads[i] = std::thread(fst_get, reader->fst); + // threads.push_back(fst_get, reader->fst); + // std::thread t(fst_get, reader->fst); + } + for (int i = 0; i < NUM_OF_THREAD; i++) { + // wait join + threads[i].join(); + } + tfCleanup(); +} +int main(int argc, char* argv[]) { + if (argc > 1) { validateTFile(argv[1]); } + // checkFstCheckIterator(); // checkFstPrefixSearch(); + return 1; } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 588205c67ff95b4a1836b37c17681b33f2fbd414..080becccf183cd374ed833895c02b1f5a949803f 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include "index.h" #include "indexInt.h" #include "index_cache.h" @@ -25,6 +26,9 @@ #include "tskiplist.h" #include "tutil.h" using namespace std; + +#define NUM_OF_THREAD 10 + class DebugInfo { public: DebugInfo(const char* str) : info(str) { @@ -41,6 +45,7 @@ class DebugInfo { private: std::string info; }; + class FstWriter { public: FstWriter() { @@ -332,6 +337,8 @@ class TFileObj { TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) { colId_ = 10; + reader_ = NULL; + writer_ = NULL; // Do Nothing // } @@ -527,6 +534,7 @@ TEST_F(IndexCacheEnv, cache_test) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + // indexTermDestry(term); } { std::string colVal("v3"); @@ -634,6 +642,23 @@ class IndexObj { indexMultiTermDestroy(terms); return numOfTable; } + int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world", + size_t numOfTable = 100 * 10000) { + std::string tColVal = colVal; + for (int i = 0; i < numOfTable; i++) { + tColVal[tColVal.size() - 1] = 'a' + i % 26; + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 10; i++) { + int ret = Put(terms, i); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + return numOfTable; + } int Put(SIndexMultiTerm* fvs, uint64_t uid) { numOfWrite += taosArrayGetSize(fvs); @@ -656,6 +681,14 @@ class IndexObj { return taosArrayGetSize(result); // assert(taosArrayGetSize(result) == targetSize); } + void PutOne(const std::string& colName, const std::string& colVal) { + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + Put(terms, 10); + indexMultiTermDestroy(terms); + } void Debug() { std::cout << "numOfWrite:" << numOfWrite << std::endl; std::cout << "numOfRead:" << numOfRead << std::endl; @@ -687,7 +720,7 @@ class IndexEnv2 : public ::testing::Test { IndexObj* index; }; TEST_F(IndexEnv2, testIndexOpen) { - std::string path = "/tmp"; + std::string path = "/tmp/test"; if (index->Init(path) != 0) { std::cout << "failed to init index" << std::endl; exit(1); @@ -723,10 +756,24 @@ TEST_F(IndexEnv2, testIndexOpen) { } indexMultiTermDestroy(terms); } - { + size_t size = 200; std::string colName("tag1"), colVal("Hello"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = size * 3; i < size * 4; i++) { + int tableId = i; + int ret = index->Put(terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + + { + std::string colName("tag1"), colVal("Hello"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); @@ -735,21 +782,44 @@ TEST_F(IndexEnv2, testIndexOpen) { SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); index->Search(mq, result); std::cout << "target size: " << taosArrayGetSize(result) << std::endl; - // assert(taosArrayGetSize(result) == targetSize); + assert(taosArrayGetSize(result) == 400); } } TEST_F(IndexEnv2, testIndex_TrigeFlush) { - std::string path = "/tmp"; - if (index->Init(path) != 0) {} + std::string path = "/tmp/test"; + if (index->Init(path) != 0) { + // r + std::cout << "failed to init" << std::endl; + } int numOfTable = 100 * 10000; - index->WriteMillonData("tag1", "Hello world", numOfTable); - int target = index->SearchOne("tag1", "Hello world"); + index->WriteMillonData("tag1", "Hello", numOfTable); + int target = index->SearchOne("tag1", "Hello"); assert(numOfTable == target); } + +static void write_and_search(IndexObj* idx) { + std::string colName("tag1"), colVal("Hello"); + + int target = idx->SearchOne("tag1", "Hello"); + idx->PutOne(colName, colVal); +} TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { - std::string path = "/tmp"; - if (index->Init(path) != 0) {} + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + // opt + } + index->WriteMultiMillonData("tag1", "Hello", 200000); + std::thread threads[NUM_OF_THREAD]; + + for (int i = 0; i < NUM_OF_THREAD; i++) { + // + threads[i] = std::thread(write_and_search, index); + } + for (int i = 0; i < NUM_OF_THREAD; i++) { + // TOD + threads[i].join(); + } } TEST_F(IndexEnv2, testIndex_multi_thread_write) { std::string path = "/tmp"; @@ -769,4 +839,7 @@ TEST_F(IndexEnv2, testIndex_performance) { std::string path = "/tmp"; if (index->Init(path) != 0) {} } -TEST_F(IndexEnv2, testIndexMultiTag) {} +TEST_F(IndexEnv2, testIndexMultiTag) { + std::string path = "/tmp"; + if (index->Init(path) != 0) {} +} diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 313f1d97afbf3f60a253105dfcac23a91af131e9..4cb20802c78176b8cd6791f463ed8abeda3df7d7 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -16,21 +16,19 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" -#include "ulog.h" -#include "tutil.h" #include "tref.h" +#include "tutil.h" +#include "ulog.h" static int32_t tsFileRsetId = -1; static int8_t tfInited = 0; -static void tfCloseFile(void *p) { - taosCloseFile((int32_t)(uintptr_t)p); -} +static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); } int32_t tfInit() { int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1); - if(old == 1) return 0; + if (old == 1) return 0; tsFileRsetId = taosOpenRef(2000, tfCloseFile); if (tsFileRsetId > 0) { return 0; @@ -79,9 +77,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode return tfOpenImp(fd); } -int64_t tfClose(int64_t tfd) { - return taosRemoveRef(tsFileRsetId, tfd); -} +int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); } int64_t tfWrite(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); @@ -109,6 +105,19 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { return ret; } +int64_t tfPread(int64_t tfd, void *buf, int64_t count, int32_t offset) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + + int64_t ret = pread(fd, buf, count, offset); + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + int32_t tfFsync(int64_t tfd) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 04bc0c8dc8a4d84e4b4454db894580b1e2cc9d55..75f5e9cdbcb96198661a2f863a7d42d4f886b965 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -14,26 +14,29 @@ */ #include "os.h" -#include "ulog.h" + #include "taoserror.h" #include "tqueue.h" +#include "ulog.h" + +typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { - struct STaosQnode *next; - char item[]; + STaosQnode *next; + char item[]; } STaosQnode; typedef struct STaosQueue { - int32_t itemSize; - int32_t numOfItems; - struct STaosQnode *head; - struct STaosQnode *tail; - struct STaosQueue *next; // for queue set - struct STaosQset *qset; // for queue set - void *ahandle; // for queue set - FProcessItem itemFp; - FProcessItems itemsFp; - pthread_mutex_t mutex; + int32_t itemSize; + int32_t numOfItems; + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set + FProcessItem itemFp; + FProcessItems itemsFp; + pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { @@ -52,8 +55,8 @@ typedef struct STaosQall { int32_t numOfItems; } STaosQall; -taos_queue taosOpenQueue() { - STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); +STaosQueue *taosOpenQueue() { + STaosQueue *queue = calloc(sizeof(STaosQueue), 1); if (queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -65,16 +68,14 @@ taos_queue taosOpenQueue() { return queue; } -void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) { - if (param == NULL) return; - STaosQueue *queue = (STaosQueue *)param; +void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { + if (queue == NULL) return; queue->itemFp = itemFp; queue->itemsFp = itemsFp; } -void taosCloseQueue(taos_queue param) { - if (param == NULL) return; - STaosQueue *queue = (STaosQueue *)param; +void taosCloseQueue(STaosQueue *queue) { + if (queue == NULL) return; STaosQnode *pTemp; STaosQset *qset; @@ -98,9 +99,8 @@ void taosCloseQueue(taos_queue param) { uTrace("queue:%p is closed", queue); } -bool taosQueueEmpty(taos_queue param) { - if (param == NULL) return true; - STaosQueue *queue = (STaosQueue *)param; +bool taosQueueEmpty(STaosQueue *queue) { + if (queue == NULL) return true; bool empty = false; pthread_mutex_lock(&queue->mutex); @@ -112,7 +112,7 @@ bool taosQueueEmpty(taos_queue param) { return empty; } -void *taosAllocateQitem(int size) { +void *taosAllocateQitem(int32_t size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); if (pNode == NULL) return NULL; @@ -129,9 +129,8 @@ void taosFreeQitem(void *param) { free(temp); } -int taosWriteQitem(taos_queue param, void *item) { - STaosQueue *queue = (STaosQueue *)param; - STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { + STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); pNode->next = NULL; pthread_mutex_lock(&queue->mutex); @@ -146,7 +145,7 @@ int taosWriteQitem(taos_queue param, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems); + uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -155,22 +154,21 @@ int taosWriteQitem(taos_queue param, void *item) { return 0; } -int taosReadQitem(taos_queue param, void **pitem) { - STaosQueue *queue = (STaosQueue *)param; +int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { STaosQnode *pNode = NULL; - int code = 0; + int32_t code = 0; pthread_mutex_lock(&queue->mutex); if (queue->head) { pNode = queue->head; - *pitem = pNode->item; + *ppItem = pNode->item; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -178,18 +176,13 @@ int taosReadQitem(taos_queue param, void **pitem) { return code; } -void *taosAllocateQall() { - void *p = calloc(sizeof(STaosQall), 1); - return p; -} +STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } -void taosFreeQall(void *param) { free(param); } +void taosFreeQall(STaosQall *qall) { free(qall); } -int taosReadAllQitems(taos_queue param, taos_qall p2) { - STaosQueue *queue = (STaosQueue *)param; - STaosQall *qall = (STaosQall *)p2; - int code = 0; - bool empty; +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { + int32_t code = 0; + bool empty; pthread_mutex_lock(&queue->mutex); @@ -219,29 +212,25 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) { return code; } -int taosGetQitem(taos_qall param, void **pitem) { - STaosQall *qall = (STaosQall *)param; +int32_t taosGetQitem(STaosQall *qall, void **ppItem) { STaosQnode *pNode; - int num = 0; + int32_t num = 0; pNode = qall->current; if (pNode) qall->current = pNode->next; if (pNode) { - *pitem = pNode->item; + *ppItem = pNode->item; num = 1; - uTrace("item:%p is fetched", *pitem); + uTrace("item:%p is fetched", *ppItem); } return num; } -void taosResetQitems(taos_qall param) { - STaosQall *qall = (STaosQall *)param; - qall->current = qall->start; -} +void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } -taos_qset taosOpenQset() { +STaosQset *taosOpenQset() { STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -255,9 +244,8 @@ taos_qset taosOpenQset() { return qset; } -void taosCloseQset(taos_qset param) { - if (param == NULL) return; - STaosQset *qset = (STaosQset *)param; +void taosCloseQset(STaosQset *qset) { + if (qset == NULL) return; // remove all the queues from qset pthread_mutex_lock(&qset->mutex); @@ -279,16 +267,12 @@ void taosCloseQset(taos_qset param) { // tsem_post 'qset->sem', so that reader threads waiting for it // resumes execution and return, should only be used to signal the // thread to exit. -void taosQsetThreadResume(taos_qset param) { - STaosQset *qset = (STaosQset *)param; +void taosQsetThreadResume(STaosQset *qset) { uDebug("qset:%p, it will exit", qset); tsem_post(&qset->sem); } -int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { - STaosQueue *queue = (STaosQueue *)p2; - STaosQset *qset = (STaosQset *)p1; - +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) { if (queue->qset) return -1; pthread_mutex_lock(&qset->mutex); @@ -309,10 +293,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { return 0; } -void taosRemoveFromQset(taos_qset p1, taos_queue p2) { - STaosQueue *queue = (STaosQueue *)p2; - STaosQset *qset = (STaosQset *)p1; - +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { STaosQueue *tqueue = NULL; pthread_mutex_lock(&qset->mutex); @@ -353,18 +334,17 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { uTrace("queue:%p is removed from qset:%p", queue, qset); } -int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } +int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) { - STaosQset *qset = (STaosQset *)param; +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { STaosQnode *pNode = NULL; - int code = 0; + int32_t code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for (int i = 0; i < qset->numOfQueues; ++i) { + for (int32_t i = 0; i < qset->numOfQueues; ++i) { if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; @@ -375,7 +355,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces if (queue->head) { pNode = queue->head; - *pitem = pNode->item; + *ppItem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; queue->head = pNode->next; @@ -383,7 +363,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -395,18 +375,15 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) { - STaosQset *qset = (STaosQset *)param; +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { STaosQueue *queue; - STaosQall *qall = (STaosQall *)p2; - int code = 0; + int32_t code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for(int i=0; inumOfQueues; ++i) { - if (qset->current == NULL) - qset->current = qset->head; + for (int32_t i = 0; i < qset->numOfQueues; ++i) { + if (qset->current == NULL) qset->current = qset->head; queue = qset->current; if (queue) qset->current = queue->next; if (queue == NULL) break; @@ -427,34 +404,32 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FPr queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - for (int j=1; jnumOfItems; ++j) tsem_wait(&qset->sem); - } + for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem); + } pthread_mutex_unlock(&queue->mutex); - if (code != 0) break; + if (code != 0) break; } pthread_mutex_unlock(&qset->mutex); return code; } -int taosGetQueueItemsNumber(taos_queue param) { - STaosQueue *queue = (STaosQueue *)param; +int32_t taosGetQueueItemsNumber(STaosQueue *queue) { if (!queue) return 0; - int num; + int32_t num; pthread_mutex_lock(&queue->mutex); num = queue->numOfItems; pthread_mutex_unlock(&queue->mutex); return num; } -int taosGetQsetItemsNumber(taos_qset param) { - STaosQset *qset = (STaosQset *)param; +int32_t taosGetQsetItemsNumber(STaosQset *qset) { if (!qset) return 0; - int num = 0; + int32_t num = 0; pthread_mutex_lock(&qset->mutex); num = qset->numOfItems; pthread_mutex_unlock(&qset->mutex); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index fb7b71b845e5f5a5584f651e14b20d67fa3d6eb4..ed74041712ce0022eb66ac5c9331b88fda961222 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -85,9 +85,9 @@ static void *tWorkerThreadFp(SWorker *worker) { return NULL; } -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { pthread_mutex_lock(&pool->mutex); - taos_queue queue = taosOpenQueue(); + STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; @@ -121,7 +121,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) return queue; } -void tWorkerFreeQueue(SWorkerPool *pool, void *queue) { +void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } @@ -195,11 +195,11 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { return NULL; } -taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { pthread_mutex_lock(&pool->mutex); SMWorker *worker = pool->workers + pool->nextId; - taos_queue *queue = taosOpenQueue(); + STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; @@ -250,7 +250,7 @@ taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems f return queue; } -void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) { +void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); }