diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h index 5fccde6bf252ecd15d58748d399b118e1291370a..f4087cf1a9b8d664a56c58bf797e01f60cba032a 100644 --- a/contrib/test/craft/raftServer.h +++ b/contrib/test/craft/raftServer.h @@ -48,10 +48,11 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, int32_t raftServerStart(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer); - int initFsm(struct raft_fsm *fsm); - +const char* state2String(unsigned short state); +void printRaftConfiguration(struct raft_configuration *c); +void printRaftState(struct raft *r); #ifdef __cplusplus diff --git a/contrib/test/traft/CMakeLists.txt b/contrib/test/traft/CMakeLists.txt index e29fea04f187d7fbd220978e0539c8929d7b7a5c..d165c6d4dc7166ae0618e6a210da79fc2661e3b7 100644 --- a/contrib/test/traft/CMakeLists.txt +++ b/contrib/test/traft/CMakeLists.txt @@ -1,7 +1,3 @@ -add_executable(raftMain "") -target_sources(raftMain - PRIVATE - "raftMain.c" - "raftServer.c" -) -target_link_libraries(raftMain PUBLIC traft lz4 uv_a) +add_subdirectory(rebalance_leader) +add_subdirectory(make_cluster) + diff --git a/contrib/test/traft/make_cluster/CMakeLists.txt b/contrib/test/traft/make_cluster/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..afd19d5435a9cfa580fa1188b90c053d250ad1f2 --- /dev/null +++ b/contrib/test/traft/make_cluster/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable(makeCluster "") +target_sources(makeCluster + PRIVATE + "raftMain.c" + "raftServer.c" + "config.c" + "console.c" + "simpleHash.c" + "util.c" +) +target_link_libraries(makeCluster PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/clear.sh b/contrib/test/traft/make_cluster/clear.sh similarity index 100% rename from contrib/test/traft/clear.sh rename to contrib/test/traft/make_cluster/clear.sh diff --git a/contrib/test/traft/make_cluster/common.h b/contrib/test/traft/make_cluster/common.h new file mode 100644 index 0000000000000000000000000000000000000000..df7422033acdc6ccd1f6c2c34c07781edb94c384 --- /dev/null +++ b/contrib/test/traft/make_cluster/common.h @@ -0,0 +1,23 @@ +#ifndef TRAFT_COMMON_H +#define TRAFT_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +#define COMMAND_LEN 512 +#define MAX_CMD_COUNT 10 +#define TOKEN_LEN 128 +#define MAX_PEERS_COUNT 19 + +#define HOST_LEN 64 +#define ADDRESS_LEN (HOST_LEN * 2) +#define BASE_DIR_LEN 128 + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/make_cluster/config.c b/contrib/test/traft/make_cluster/config.c new file mode 100644 index 0000000000000000000000000000000000000000..3b96839fd962a98248080584927caed762e52007 --- /dev/null +++ b/contrib/test/traft/make_cluster/config.c @@ -0,0 +1,64 @@ +#include "config.h" +#include +#include +#include + +void addrToString(const char *host, uint16_t port, char *addr, int len) { snprintf(addr, len, "%s:%hu", host, port); } + +void parseAddr(const char *addr, char *host, int len, uint16_t *port) { + char *tmp = (char *)malloc(strlen(addr) + 1); + strcpy(tmp, addr); + + char *context; + char *separator = ":"; + char *token = strtok_r(tmp, separator, &context); + if (token) { + snprintf(host, len, "%s", token); + } + + token = strtok_r(NULL, separator, &context); + if (token) { + sscanf(token, "%hu", port); + } + + free(tmp); +} + +int parseConf(int argc, char **argv, RaftServerConfig *pConf) { + memset(pConf, 0, sizeof(*pConf)); + + int option_index, option_value; + option_index = 0; + static struct option long_options[] = {{"help", no_argument, NULL, 'h'}, + {"addr", required_argument, NULL, 'a'}, + {"dir", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0}}; + + while ((option_value = getopt_long(argc, argv, "ha:d:", long_options, &option_index)) != -1) { + switch (option_value) { + case 'a': { + parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); + break; + } + + case 'd': { + snprintf(pConf->baseDir, sizeof(pConf->baseDir), "%s", optarg); + break; + } + + case 'h': { + return -2; + } + + default: { return -2; } + } + } + + return 0; +} + +void printConf(RaftServerConfig *pConf) { + printf("\n---printConf: \n"); + printf("me: [%s:%hu] \n", pConf->me.host, pConf->me.port); + printf("dataDir: [%s] \n\n", pConf->baseDir); +} diff --git a/contrib/test/traft/make_cluster/config.h b/contrib/test/traft/make_cluster/config.h new file mode 100644 index 0000000000000000000000000000000000000000..13c43d0d285364065b558a52d2e6292ee21c5af2 --- /dev/null +++ b/contrib/test/traft/make_cluster/config.h @@ -0,0 +1,31 @@ +#ifndef TRAFT_CONFIG_H +#define TRAFT_CONFIG_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "common.h" + +typedef struct { + char host[HOST_LEN]; + uint16_t port; +} Addr; + +typedef struct { + Addr me; + char baseDir[BASE_DIR_LEN]; +} RaftServerConfig; + +void addrToString(const char *host, uint16_t port, char *addr, int len); +void parseAddr(const char *addr, char *host, int len, uint16_t *port); +int parseConf(int argc, char **argv, RaftServerConfig *pConf); +void printConf(RaftServerConfig *pConf); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/make_cluster/console.c b/contrib/test/traft/make_cluster/console.c new file mode 100644 index 0000000000000000000000000000000000000000..b00550c681f33ed6807ce9c97bc6c24b600c9526 --- /dev/null +++ b/contrib/test/traft/make_cluster/console.c @@ -0,0 +1,202 @@ +#include "console.h" +#include +#include +#include +#include "raftServer.h" +#include "util.h" + +void printHelp() { + printf("---------------------\n"); + printf("help: \n\n"); + printf("create a vgroup with 3 replicas: \n"); + printf("create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 \n"); + printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10002 \n"); + printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10001 \n"); + printf("\n"); + printf("create a vgroup with only one replica: \n"); + printf("create vnode voter vid 200 \n"); + printf("\n"); + printf("add vnode into vgroup: \n"); + printf("create vnode spare vid 100 ---- run at 127.0.0.1:10003\n"); + printf("join vnode vid 100 addr 127.0.0.1:10003 ---- run at leader of vgroup 100\n"); + printf("\n"); + printf("run \n"); + printf("put 0 key value \n"); + printf("get 0 key \n"); + printf("---------------------\n"); +} + +void console(RaftServer *pRaftServer) { + while (1) { + int ret; + char cmdBuf[COMMAND_LEN]; + memset(cmdBuf, 0, sizeof(cmdBuf)); + printf("(console)> "); + char *retp = fgets(cmdBuf, COMMAND_LEN, stdin); + if (!retp) { + exit(-1); + } + + int pos = strlen(cmdBuf); + if (cmdBuf[pos - 1] == '\n') { + cmdBuf[pos - 1] = '\0'; + } + + if (strncmp(cmdBuf, "", COMMAND_LEN) == 0) { + continue; + } + + char cmds[MAX_CMD_COUNT][TOKEN_LEN]; + memset(cmds, 0, sizeof(cmds)); + + int cmdCount; + cmdCount = splitString(cmdBuf, " ", cmds, MAX_CMD_COUNT); + + if (strcmp(cmds[0], "create") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[3], "vid") == 0) { + uint16_t vid; + sscanf(cmds[4], "%hu", &vid); + + if (strcmp(cmds[2], "voter") == 0) { + char peers[MAX_PEERS_COUNT][ADDRESS_LEN]; + memset(peers, 0, sizeof(peers)); + uint32_t peersCount = 0; + + if (strcmp(cmds[5], "peers") == 0 && cmdCount > 6) { + // create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 + for (int i = 6; i < cmdCount; ++i) { + snprintf(peers[i - 6], ADDRESS_LEN, "%s", cmds[i]); + peersCount++; + } + } else { + // create vnode voter vid 200 + } + ret = addRaftVoter(pRaftServer, peers, peersCount, vid); + if (ret == 0) { + printf("create vnode voter ok \n"); + } else { + printf("create vnode voter error \n"); + } + } else if (strcmp(cmds[2], "spare") == 0) { + ret = addRaftSpare(pRaftServer, vid); + if (ret == 0) { + printf("create vnode spare ok \n"); + } else { + printf("create vnode spare error \n"); + } + } else { + printHelp(); + } + + } else if (strcmp(cmds[0], "join") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[2], "vid") == 0 && + strcmp(cmds[4], "addr") == 0 && cmdCount == 6) { + // join vnode vid 100 addr 127.0.0.1:10004 + + char * address = cmds[5]; + char host[64]; + uint16_t port; + parseAddr(address, host, sizeof(host), &port); + + uint16_t vid; + sscanf(cmds[3], "%hu", &vid); + + HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); + if (*pp == NULL) { + printf("vid:%hu not found \n", vid); + break; + } + RaftInstance *pRaftInstance = (*pp)->data; + + uint64_t destRaftId = encodeRaftId(host, port, vid); + + struct raft_change *req = raft_malloc(sizeof(*req)); + RaftJoin * pRaftJoin = raft_malloc(sizeof(*pRaftJoin)); + pRaftJoin->r = &pRaftInstance->raft; + pRaftJoin->joinId = destRaftId; + req->data = pRaftJoin; + ret = raft_add(&pRaftInstance->raft, req, destRaftId, address, raftChangeAddCb); + if (ret != 0) { + printf("raft_add error: %s \n", raft_errmsg(&pRaftInstance->raft)); + } + + } else if (strcmp(cmds[0], "dropnode") == 0) { + } else if (strcmp(cmds[0], "state") == 0) { + pRaftServer->raftInstances.print(&pRaftServer->raftInstances); + for (size_t i = 0; i < pRaftServer->raftInstances.length; ++i) { + HashNode *ptr = pRaftServer->raftInstances.table[i]; + if (ptr != NULL) { + while (ptr != NULL) { + RaftInstance *pRaftInstance = ptr->data; + printf("instance vid:%hu raftId:%llu \n", ptr->vgroupId, pRaftInstance->raftId); + printRaftState(&pRaftInstance->raft); + printf("\n"); + ptr = ptr->next; + } + printf("\n"); + } + } + + } else if (strcmp(cmds[0], "put") == 0 && cmdCount == 4) { + uint16_t vid; + sscanf(cmds[1], "%hu", &vid); + char * key = cmds[2]; + char * value = cmds[3]; + HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); + if (*pp == NULL) { + printf("vid:%hu not found \n", vid); + break; + } + RaftInstance *pRaftInstance = (*pp)->data; + + char *raftValue = malloc(TOKEN_LEN * 2 + 3); + snprintf(raftValue, TOKEN_LEN * 2 + 3, "%s--%s", key, value); + putValue(&pRaftInstance->raft, raftValue); + free(raftValue); + + } else if (strcmp(cmds[0], "run") == 0) { + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, pRaftServer); + + } else if (strcmp(cmds[0], "get") == 0 && cmdCount == 3) { + uint16_t vid; + sscanf(cmds[1], "%hu", &vid); + char * key = cmds[2]; + HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); + if (*pp == NULL) { + printf("vid:%hu not found \n", vid); + break; + } + RaftInstance * pRaftInstance = (*pp)->data; + SimpleHash * pKV = pRaftInstance->fsm.data; + SimpleHashNode **ppNode = pKV->find_cstr(pKV, key); + if (*ppNode == NULL) { + printf("key:%s not found \n", key); + } else { + printf("find key:%s value:%s \n", key, (char *)((*ppNode)->data)); + } + + } else if (strcmp(cmds[0], "transfer") == 0) { + } else if (strcmp(cmds[0], "state") == 0) { + } else if (strcmp(cmds[0], "snapshot") == 0) { + } else if (strcmp(cmds[0], "exit") == 0) { + exit(0); + + } else if (strcmp(cmds[0], "quit") == 0) { + exit(0); + + } else if (strcmp(cmds[0], "help") == 0) { + printHelp(); + + } else { + printf("unknown command: %s \n", cmdBuf); + printHelp(); + } + + /* + printf("cmdBuf: [%s] \n", cmdBuf); + printf("cmdCount : %d \n", cmdCount); + for (int i = 0; i < MAX_CMD_COUNT; ++i) { + printf("cmd%d : %s \n", i, cmds[i]); + } + */ + } +} diff --git a/contrib/test/traft/make_cluster/console.h b/contrib/test/traft/make_cluster/console.h new file mode 100644 index 0000000000000000000000000000000000000000..f9ed12baf5e9c674f906c78de38c39fd80d1e71f --- /dev/null +++ b/contrib/test/traft/make_cluster/console.h @@ -0,0 +1,19 @@ +#ifndef TRAFT_CONSOLE_H +#define TRAFT_CONSOLE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "common.h" +#include "raftServer.h" + +void console(RaftServer *pRaftServer); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/make_cluster/raftMain.c b/contrib/test/traft/make_cluster/raftMain.c new file mode 100644 index 0000000000000000000000000000000000000000..e25636de91ae5d634e3341b4b70a1868088a751d --- /dev/null +++ b/contrib/test/traft/make_cluster/raftMain.c @@ -0,0 +1,81 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "config.h" +#include "console.h" +#include "raftServer.h" +#include "simpleHash.h" +#include "util.h" + +const char *exe_name; + +void *startConsoleFunc(void *param) { + RaftServer *pRaftServer = (RaftServer *)param; + console(pRaftServer); + return NULL; +} + +void usage() { + printf("\nusage: \n"); + printf("%s --addr=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("\n"); +} + +RaftServerConfig gConfig; +RaftServer gRaftServer; + +int main(int argc, char **argv) { + srand(time(NULL)); + int32_t ret; + + exe_name = argv[0]; + if (argc < 3) { + usage(); + exit(-1); + } + + ret = parseConf(argc, argv, &gConfig); + if (ret != 0) { + usage(); + exit(-1); + } + printConf(&gConfig); + + if (!dirOK(gConfig.baseDir)) { + ret = mkdir(gConfig.baseDir, 0775); + if (ret != 0) { + fprintf(stderr, "mkdir error, %s \n", gConfig.baseDir); + exit(-1); + } + } + + ret = raftServerInit(&gRaftServer, &gConfig); + if (ret != 0) { + fprintf(stderr, "raftServerInit error \n"); + exit(-1); + } + + /* + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, &gRaftServer); + */ + + pthread_t tidConsole; + pthread_create(&tidConsole, NULL, startConsoleFunc, &gRaftServer); + + while (1) { + sleep(10); + } + + return 0; +} diff --git a/contrib/test/traft/make_cluster/raftServer.c b/contrib/test/traft/make_cluster/raftServer.c new file mode 100644 index 0000000000000000000000000000000000000000..bbf67b94207d73c0d7f56157072f9a506f1ec1bd --- /dev/null +++ b/contrib/test/traft/make_cluster/raftServer.c @@ -0,0 +1,286 @@ +#include "raftServer.h" +#include +#include +#include "common.h" +#include "simpleHash.h" +#include "util.h" + +void *startServerFunc(void *param) { + RaftServer *pRaftServer = (RaftServer *)param; + int32_t r = raftServerStart(pRaftServer); + assert(r == 0); + return NULL; +} + +void raftChangeAssignCb(struct raft_change *req, int status) { + struct raft *r = req->data; + if (status != 0) { + printf("raftChangeAssignCb error: %s \n", raft_errmsg(r)); + } else { + printf("raftChangeAssignCb ok \n"); + } + raft_free(req); +} + +void raftChangeAddCb(struct raft_change *req, int status) { + RaftJoin *pRaftJoin = req->data; + if (status != 0) { + printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r)); + } else { + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + req2->data = pRaftJoin->r; + int ret = raft_assign(pRaftJoin->r, req2, pRaftJoin->joinId, RAFT_VOTER, raftChangeAssignCb); + if (ret != 0) { + printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r)); + } + } + raft_free(req->data); + raft_free(req); +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { + // get fsm data + SimpleHash *sh = pFsm->data; + + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: [%s] \n", msg); + char arr[2][TOKEN_LEN]; + int r = splitString(msg, "--", arr, 2); + assert(r == 2); + + // do the value on fsm + sh->insert_cstr(sh, arr[0], arr[1]); + + raft_free(buf->base); + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + struct raft *r = req->data; + if (status != 0) { + printf("putValueCb error: %s \n", raft_errmsg(r)); + } else { + printf("putValueCb: %s \n", "ok"); + } + raft_free(req); +} + +void putValue(struct raft *r, const char *value) { + struct raft_buffer buf; + + buf.len = strlen(value) + 1; + buf.base = raft_malloc(buf.len); + snprintf(buf.base, buf.len, "%s", value); + + struct raft_apply *req = raft_malloc(sizeof(*req)); + req->data = r; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +const char *state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + } + return "UNKNOWN_RAFT_STATE"; +} + +void printRaftConfiguration(struct raft_configuration *c) { + printf("configuration: \n"); + for (int i = 0; i < c->n; ++i) { + printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); + } +} + +void printRaftState(struct raft *r) { + printf("----Raft State: -----------\n"); + printf("mem_addr: %p \n", r); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + + printf("configuration_index: %llu \n", r->configuration_index); + printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); + printRaftConfiguration(&r->configuration); + + printf("----------------------------\n"); +} + +int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid) { + int ret; + + RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance)); + assert(pRaftInstance != NULL); + + // init raftId + pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); + + // init dir + snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir, + pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId); + + if (!dirOK(pRaftInstance->dir)) { + ret = mkdir(pRaftInstance->dir, 0775); + if (ret != 0) { + fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir); + assert(0); + } + } + + // init fsm + pRaftInstance->fsm.data = newSimpleHash(2); + pRaftInstance->fsm.apply = fsmApplyCb; + + // init io + ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport); + if (ret != 0) { + fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft + ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId, + pRaftServer->address); + if (ret != 0) { + fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft_configuration + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_VOTER); + for (int i = 0; i < peersCount; ++i) { + char * peerAddress = peers[i]; + char host[64]; + uint16_t port; + parseAddr(peerAddress, host, sizeof(host), &port); + uint64_t raftId = encodeRaftId(host, port, vid); + raft_configuration_add(&conf, raftId, peers[i], RAFT_VOTER); + } + raft_bootstrap(&pRaftInstance->raft, &conf); + + // start raft + ret = raft_start(&pRaftInstance->raft); + if (ret != 0) { + fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // add raft instance into raft server + pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance); + + return 0; +} + +int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid) { + int ret; + + RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance)); + assert(pRaftInstance != NULL); + + // init raftId + pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); + + // init dir + snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir, + pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId); + ret = mkdir(pRaftInstance->dir, 0775); + if (ret != 0) { + fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir); + assert(0); + } + + // init fsm + pRaftInstance->fsm.data = newSimpleHash(2); + pRaftInstance->fsm.apply = fsmApplyCb; + + // init io + ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport); + if (ret != 0) { + fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft + ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId, + pRaftServer->address); + if (ret != 0) { + fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft_configuration + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_SPARE); + raft_bootstrap(&pRaftInstance->raft, &conf); + + // start raft + ret = raft_start(&pRaftInstance->raft); + if (ret != 0) { + fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // add raft instance into raft server + pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance); + + return 0; +} + +int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf) { + int ret; + + // init host, port, address, dir + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); + pRaftServer->port = pConf->me.port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); + snprintf(pRaftServer->baseDir, sizeof(pRaftServer->baseDir), "%s", pConf->baseDir); + + // init loop + ret = uv_loop_init(&pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret)); + assert(0); + } + + // init network + ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret); + assert(0); + } + + // init raft instance container + initIdHash(&pRaftServer->raftInstances, 2); + + return 0; +} + +int32_t raftServerStart(RaftServer *pRaftServer) { + // start loop + uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); + return 0; +} + +void raftServerStop(RaftServer *pRaftServer) {} diff --git a/contrib/test/traft/make_cluster/raftServer.h b/contrib/test/traft/make_cluster/raftServer.h new file mode 100644 index 0000000000000000000000000000000000000000..b6dbddb2b7b6125db0eb8443aa1eb0be8954fe26 --- /dev/null +++ b/contrib/test/traft/make_cluster/raftServer.h @@ -0,0 +1,66 @@ +#ifndef TDENGINE_RAFT_SERVER_H +#define TDENGINE_RAFT_SERVER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include "common.h" +#include "config.h" +#include "raft.h" +#include "raft/uv.h" +#include "simpleHash.h" + +typedef struct RaftJoin { + struct raft *r; + raft_id joinId; +} RaftJoin; + +typedef struct { + raft_id raftId; + char dir[BASE_DIR_LEN * 2]; + struct raft_fsm fsm; + struct raft_io io; + struct raft raft; +} RaftInstance; + +typedef struct { + char host[HOST_LEN]; + uint16_t port; + char address[ADDRESS_LEN]; /* Raft instance address */ + char baseDir[BASE_DIR_LEN]; /* Raft instance address */ + + struct uv_loop_s loop; /* UV loop */ + struct raft_uv_transport transport; /* UV I/O backend transport */ + + IdHash raftInstances; /* multi raft instances. traft use IdHash to manager multi vgroup inside, here we can use IdHash + too. */ +} RaftServer; + +void * startServerFunc(void *param); +int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid); +int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid); + +int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf); +int32_t raftServerStart(RaftServer *pRaftServer); +void raftServerStop(RaftServer *pRaftServer); + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result); +void putValueCb(struct raft_apply *req, int status, void *result); +void putValue(struct raft *r, const char *value); + +void raftChangeAddCb(struct raft_change *req, int status); + +const char *state2String(unsigned short state); +void printRaftConfiguration(struct raft_configuration *c); +void printRaftState(struct raft *r); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SERVER_H diff --git a/contrib/test/traft/make_cluster/simpleHash.c b/contrib/test/traft/make_cluster/simpleHash.c new file mode 100644 index 0000000000000000000000000000000000000000..6694843874682cece47231f46a0ea4c7c355924c --- /dev/null +++ b/contrib/test/traft/make_cluster/simpleHash.c @@ -0,0 +1,218 @@ +#include "simpleHash.h" + +uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed) { + // Similar to murmur hash + const uint32_t m = 0xc6a4a793; + const uint32_t r = 24; + const char* limit = data + n; + uint32_t h = seed ^ (n * m); + + // Pick up four bytes at a time + while (data + 4 <= limit) { + // uint32_t w = DecodeFixed32(data); + uint32_t w; + memcpy(&w, data, 4); + + data += 4; + h += w; + h *= m; + h ^= (h >> 16); + } + + // Pick up remaining bytes + switch (limit - data) { + case 3: + h += (unsigned char)(data[2]) << 16; + do { + } while (0); + case 2: + h += (unsigned char)(data[1]) << 8; + do { + } while (0); + case 1: + h += (unsigned char)(data[0]); + h *= m; + h ^= (h >> r); + break; + } + return h; +} + +int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data) { + return insertSimpleHash(ths, key, strlen(key) + 1, data, strlen(data) + 1); +} + +int removeCStrSimpleHash(struct SimpleHash* ths, char* key) { return removeSimpleHash(ths, key, strlen(key) + 1); } + +SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key) { + return findSimpleHash(ths, key, strlen(key) + 1); +} + +int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen) { + SimpleHashNode** pp = ths->find(ths, key, keyLen); + if (*pp != NULL) { + fprintf(stderr, "insertSimpleHash, already has key \n"); + return -1; + } + + SimpleHashNode* node = malloc(sizeof(*node)); + node->hashCode = ths->hashFunc(key, keyLen); + node->key = malloc(keyLen); + node->keyLen = keyLen; + memcpy(node->key, key, keyLen); + node->data = malloc(dataLen); + node->dataLen = dataLen; + memcpy(node->data, data, dataLen); + node->next = NULL; + + // printf("insertSimpleHash: <%s, %ld, %s, %ld, %u> \n", node->key, node->keyLen, node->data, node->dataLen, + // node->hashCode); + + size_t index = node->hashCode & (ths->length - 1); + + SimpleHashNode* ptr = ths->table[index]; + if (ptr != NULL) { + node->next = ptr; + ths->table[index] = node; + + } else { + ths->table[index] = node; + } + ths->elems++; + if (ths->elems > 2 * ths->length) { + ths->resize(ths); + } + + return 0; +} + +int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) { + SimpleHashNode** pp = ths->find(ths, key, keyLen); + if (*pp == NULL) { + fprintf(stderr, "removeSimpleHash, key not exist \n"); + return -1; + } + + SimpleHashNode* del = *pp; + *pp = del->next; + free(del->key); + free(del->data); + free(del); + ths->elems--; + + return 0; +} + +SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) { + uint32_t hashCode = ths->hashFunc(key, keyLen); + // size_t index = hashCode % ths->length; + size_t index = hashCode & (ths->length - 1); + + // printf("findSimpleHash: %s %ld %u \n", key, keyLen, hashCode); + + SimpleHashNode** pp = &(ths->table[index]); + while (*pp != NULL && ((*pp)->hashCode != hashCode || memcmp(key, (*pp)->key, keyLen) != 0)) { + pp = &((*pp)->next); + } + + return pp; +} + +void printCStrSimpleHash(struct SimpleHash* ths) { + printf("\n--- printCStrSimpleHash: elems:%d length:%d \n", ths->elems, ths->length); + for (size_t i = 0; i < ths->length; ++i) { + SimpleHashNode* ptr = ths->table[i]; + if (ptr != NULL) { + printf("%zu: ", i); + while (ptr != NULL) { + printf("<%u, %s, %ld, %s, %ld> ", ptr->hashCode, (char*)ptr->key, ptr->keyLen, (char*)ptr->data, ptr->dataLen); + ptr = ptr->next; + } + printf("\n"); + } + } + printf("---------------\n"); +} + +void destroySimpleHash(struct SimpleHash* ths) { + for (size_t i = 0; i < ths->length; ++i) { + SimpleHashNode* ptr = ths->table[i]; + while (ptr != NULL) { + SimpleHashNode* tmp = ptr; + ptr = ptr->next; + free(tmp->key); + free(tmp->data); + free(tmp); + } + } + + ths->length = 0; + ths->elems = 0; + free(ths->table); + free(ths); +} + +void resizeSimpleHash(struct SimpleHash* ths) { + uint32_t new_length = ths->length; + while (new_length < ths->elems) { + new_length *= 2; + } + + printf("resizeSimpleHash: %p from %u to %u \n", ths, ths->length, new_length); + + SimpleHashNode** new_table = malloc(new_length * sizeof(SimpleHashNode*)); + memset(new_table, 0, new_length * sizeof(SimpleHashNode*)); + + uint32_t count = 0; + for (uint32_t i = 0; i < ths->length; i++) { + if (ths->table[i] == NULL) { + continue; + } + + SimpleHashNode* it = ths->table[i]; + while (it != NULL) { + SimpleHashNode* move_node = it; + it = it->next; + + // move move_node + move_node->next = NULL; + size_t index = move_node->hashCode & (new_length - 1); + + SimpleHashNode* ptr = new_table[index]; + if (ptr != NULL) { + move_node->next = ptr; + new_table[index] = move_node; + } else { + new_table[index] = move_node; + } + count++; + } + } + + assert(ths->elems == count); + free(ths->table); + ths->table = new_table; + ths->length = new_length; +} + +uint32_t simpleHashFunc(const char* key, size_t keyLen) { return mySimpleHash(key, keyLen, 1); } + +struct SimpleHash* newSimpleHash(size_t length) { + struct SimpleHash* ths = malloc(sizeof(*ths)); + + ths->length = length; + ths->elems = 0; + ths->table = malloc(length * sizeof(SimpleHashNode*)); + memset(ths->table, 0, length * sizeof(SimpleHashNode*)); + + ths->insert = insertSimpleHash; + ths->remove = removeSimpleHash; + ths->find = findSimpleHash; + ths->insert_cstr = insertCStrSimpleHash; + ths->remove_cstr = removeCStrSimpleHash; + ths->find_cstr = findCStrSimpleHash; + ths->print_cstr = printCStrSimpleHash; + ths->destroy = destroySimpleHash; + ths->resize = resizeSimpleHash; + ths->hashFunc = simpleHashFunc; +} diff --git a/contrib/test/traft/make_cluster/simpleHash.h b/contrib/test/traft/make_cluster/simpleHash.h new file mode 100644 index 0000000000000000000000000000000000000000..c6fcd93888ed5b53b5c4e1abcde3c38a5fe642dc --- /dev/null +++ b/contrib/test/traft/make_cluster/simpleHash.h @@ -0,0 +1,61 @@ +#ifndef __SIMPLE_HASH_H__ +#define __SIMPLE_HASH_H__ + +#include +#include +#include +#include +#include + +uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed); + +typedef struct SimpleHashNode { + uint32_t hashCode; + void* key; + size_t keyLen; + void* data; + size_t dataLen; + struct SimpleHashNode* next; +} SimpleHashNode; + +typedef struct SimpleHash { + // public: + + int (*insert)(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen); + int (*remove)(struct SimpleHash* ths, char* key, size_t keyLen); + SimpleHashNode** (*find)(struct SimpleHash* ths, char* key, size_t keyLen); + + // wrapper + int (*insert_cstr)(struct SimpleHash* ths, char* key, char* data); + int (*remove_cstr)(struct SimpleHash* ths, char* key); + SimpleHashNode** (*find_cstr)(struct SimpleHash* ths, char* key); + + void (*print_cstr)(struct SimpleHash* ths); + void (*destroy)(struct SimpleHash* ths); + + uint32_t length; + uint32_t elems; + + // private: + void (*resize)(struct SimpleHash* ths); + uint32_t (*hashFunc)(const char* key, size_t keyLen); + + SimpleHashNode** table; + +} SimpleHash; + +int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data); +int removeCStrSimpleHash(struct SimpleHash* ths, char* key); +SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key); +void printCStrSimpleHash(struct SimpleHash* ths); + +int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen); +int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen); +SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen); +void destroySimpleHash(struct SimpleHash* ths); +void resizeSimpleHash(struct SimpleHash* ths); +uint32_t simpleHashFunc(const char* key, size_t keyLen); + +struct SimpleHash* newSimpleHash(size_t length); + +#endif diff --git a/contrib/test/traft/make_cluster/util.c b/contrib/test/traft/make_cluster/util.c new file mode 100644 index 0000000000000000000000000000000000000000..ff704f366025b4d38343ec644cfea8f22b625414 --- /dev/null +++ b/contrib/test/traft/make_cluster/util.c @@ -0,0 +1,45 @@ +#include "util.h" +#include +#include +#include + +int dirOK(const char *path) { + DIR *dir = opendir(path); + if (dir != NULL) { + closedir(dir); + return 1; + } else { + return 0; + } +} + +int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr) { + if (n_arr <= 0) { + return -1; + } + + char *tmp = (char *)malloc(strlen(str) + 1); + strcpy(tmp, str); + char *context; + int n = 0; + + char *token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + strncpy(arr[n], token, TOKEN_LEN); + n++; + + while (1) { + token = strtok_r(NULL, separator, &context); + if (!token || n >= n_arr) { + goto ret; + } + strncpy(arr[n], token, TOKEN_LEN); + n++; + } + +ret: + free(tmp); + return n; +} diff --git a/contrib/test/traft/make_cluster/util.h b/contrib/test/traft/make_cluster/util.h new file mode 100644 index 0000000000000000000000000000000000000000..fb4ccb9c5c898688fde8335da5f3fccb39b08075 --- /dev/null +++ b/contrib/test/traft/make_cluster/util.h @@ -0,0 +1,17 @@ +#ifndef TRAFT_UTIL_H +#define TRAFT_UTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common.h" + +int dirOK(const char *path); +int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/rebalance_leader/CMakeLists.txt b/contrib/test/traft/rebalance_leader/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..92640bdd80f380913ad2c985b443b473e36dae60 --- /dev/null +++ b/contrib/test/traft/rebalance_leader/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(rebalanceLeader "") +target_sources(rebalanceLeader + PRIVATE + "raftMain.c" + "raftServer.c" +) +target_link_libraries(rebalanceLeader PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/rebalance_leader/clear.sh b/contrib/test/traft/rebalance_leader/clear.sh new file mode 100644 index 0000000000000000000000000000000000000000..398b3088f20ae8cce179ff909f206fc162918876 --- /dev/null +++ b/contrib/test/traft/rebalance_leader/clear.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/traft/common.h b/contrib/test/traft/rebalance_leader/common.h similarity index 100% rename from contrib/test/traft/common.h rename to contrib/test/traft/rebalance_leader/common.h diff --git a/contrib/test/traft/raftMain.c b/contrib/test/traft/rebalance_leader/raftMain.c similarity index 94% rename from contrib/test/traft/raftMain.c rename to contrib/test/traft/rebalance_leader/raftMain.c index 24ad93856c5c21cb74d9799ff87c5f1d2c8481be..70dc191997ba208cf67fc44e116da1d8bf8c8681 100644 --- a/contrib/test/traft/raftMain.c +++ b/contrib/test/traft/rebalance_leader/raftMain.c @@ -60,9 +60,9 @@ void raftTransferCb(struct raft_transfer *req) { SRaftServer *pRaftServer = req->data; raft_free(req); - printf("raftTransferCb: \n"); + //printf("raftTransferCb: \n"); updateLeaderStates(pRaftServer); - printLeaderCount(); + //printLeaderCount(); int myLeaderCount; for (int i = 0; i < NODE_COUNT; ++i) { @@ -71,12 +71,13 @@ void raftTransferCb(struct raft_transfer *req) { } } - printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); + //printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { struct raft *r; for (int j = 0; j < pRaftServer->instanceCount; ++j) { if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { r = &pRaftServer->instance[j].raft; + break; } } @@ -87,17 +88,25 @@ void raftTransferCb(struct raft_transfer *req) { int minIndex = -1; int minLeaderCount = myLeaderCount; for (int j = 0; j < NODE_COUNT; ++j) { - if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; + if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) { + continue; + } + if (leaderStates[j].leaderCount <= minLeaderCount) { + minLeaderCount = leaderStates[j].leaderCount; minIndex = j; } } + char myHost[48]; uint16_t myPort; uint16_t myVid; decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); - + + + //printf("raftTransferCb transfer leader: vid[%u] choose: index:%d, leaderStates[%d].address:%s, leaderStates[%d].leaderCount:%d \n", minIndex, minIndex, leaderStates[minIndex].address, minIndex, leaderStates[minIndex].leaderCount); + char *destAddress = leaderStates[minIndex].address; char tokens[MAX_PEERS][MAX_TOKEN_LEN]; @@ -106,6 +115,9 @@ void raftTransferCb(struct raft_transfer *req) { uint16_t destPort = atoi(tokens[1]); destRaftId = encodeRaftId(destHost, destPort, myVid); + printf("\nraftTransferCb transfer leader: vgroupId:%u from:%s:%u --> to:%s:%u ", myVid, myHost, myPort, destHost, destPort); + fflush(stdout); + raft_transfer(r, transfer, destRaftId, raftTransferCb); } @@ -252,7 +264,6 @@ const char* state2String(unsigned short state) { void printRaftState2(struct raft *r) { - char leaderAddress[128]; memset(leaderAddress, 0, sizeof(leaderAddress)); @@ -350,6 +361,7 @@ void console(SRaftServer *pRaftServer) { while (1) { char cmd_buf[COMMAND_LEN]; memset(cmd_buf, 0, sizeof(cmd_buf)); + printf("(console)> "); char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); if (!ret) { exit(-1); @@ -403,7 +415,10 @@ void console(SRaftServer *pRaftServer) { } else if (strcmp(cmd, "dropnode") == 0) { printf("not support \n"); - } else if (strcmp(cmd, "rebalance") == 0) { + } else if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) { + exit(0); + + } else if (strcmp(cmd, "rebalance") == 0 && strcmp(param1, "leader") == 0) { /* updateLeaderStates(pRaftServer); @@ -511,10 +526,14 @@ void console(SRaftServer *pRaftServer) { printRaftState(&pRaftServer->instance[i].raft); } - } else if (strcmp(cmd, "state2") == 0) { + } else if (strcmp(cmd, "leader") == 0 && strcmp(param1, "state") == 0) { + updateLeaderStates(pRaftServer); + printf("\n--------------------------------------------\n"); + printLeaderCount(); for (int i = 0; i < pRaftServer->instanceCount; ++i) { printRaftState2(&pRaftServer->instance[i].raft); } + printf("--------------------------------------------\n"); } else if (strcmp(cmd, "snapshot") == 0) { printf("not support \n"); diff --git a/contrib/test/traft/raftServer.c b/contrib/test/traft/rebalance_leader/raftServer.c similarity index 92% rename from contrib/test/traft/raftServer.c rename to contrib/test/traft/rebalance_leader/raftServer.c index 94de49cd0fd141e0afb35db2bd1946272b1d5c2f..165d3c9023fbe077c04d9bfa4f37784b07469675 100644 --- a/contrib/test/traft/raftServer.c +++ b/contrib/test/traft/rebalance_leader/raftServer.c @@ -3,32 +3,34 @@ #include "common.h" #include "raftServer.h" -char *keys; -char *values; +//char *keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);; +//char *values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + + +char keys[MAX_KV_LEN][MAX_RECORD_COUNT]; +char values[MAX_KV_LEN][MAX_RECORD_COUNT]; +int writeIndex = 0; void initStore() { - keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); - values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); - writeIndex = 0; } void destroyStore() { - free(keys); - free(values); + //free(keys); + //free(values); } void putKV(const char *key, const char *value) { if (writeIndex < MAX_RECORD_COUNT) { - strncpy(&keys[writeIndex], key, MAX_KV_LEN); - strncpy(&values[writeIndex], value, MAX_KV_LEN); + strncpy(keys[writeIndex], key, MAX_KV_LEN); + strncpy(values[writeIndex], value, MAX_KV_LEN); writeIndex++; } } char *getKV(const char *key) { for (int i = 0; i < MAX_RECORD_COUNT; ++i) { - if (strcmp(&keys[i], key) == 0) { - return &values[i]; + if (strcmp(keys[i], key) == 0) { + return values[i]; } } return NULL; diff --git a/contrib/test/traft/raftServer.h b/contrib/test/traft/rebalance_leader/raftServer.h similarity index 93% rename from contrib/test/traft/raftServer.h rename to contrib/test/traft/rebalance_leader/raftServer.h index b1f62caac590a2aa3f7d671378a0dcea00189b62..5ea43985c9b0601d884f40d8e544e1d0955d2bc0 100644 --- a/contrib/test/traft/raftServer.h +++ b/contrib/test/traft/rebalance_leader/raftServer.h @@ -15,11 +15,13 @@ extern "C" { // simulate a db store, just for test -#define MAX_KV_LEN 100 -#define MAX_RECORD_COUNT 500 -char *keys; -char *values; -int writeIndex; +#define MAX_KV_LEN 20 +#define MAX_RECORD_COUNT 16 + + +//char *keys; +//char *values; +//int writeIndex; void initStore(); void destroyStore(); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 39b43d16020d4a35bb279f33c9e3c3c3b5041440..aeaa1d8361a781e463fafecc1b5f0ed376efe8e4 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -152,7 +152,7 @@ TEST(testCase, create_db_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); } @@ -254,7 +254,7 @@ TEST(testCase, use_db_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); } @@ -505,15 +505,17 @@ TEST(testCase, create_multiple_tables) { taos_free_result(pRes); -// for(int32_t i = 0; i < 1000; ++i) { -// char sql[512] = {0}; -// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); -// TAOS_RES* pres = taos_query(pConn, sql); -// if (taos_errno(pres) != 0) { -// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); -// } -// taos_free_result(pres); -// } + for(int32_t i = 0; i < 200000; ++i) { + char sql[512] = {0}; + snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); + TAOS_RES* pres = taos_query(pConn, sql); + if (taos_errno(pres) != 0) { + printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); + } + + printf("%d\n", i); + taos_free_result(pres); + } taos_close(pConn); } diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 5f850f0112df4408b56f457fe25562eb8df10f7d..308fc9d2e5596e117f63c94a623c539b9c3424cb 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -22,12 +22,12 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("query message is processed"); + vTrace("query message is processed"); return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); } int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("fetch message is processed"); + vTrace("fetch message is processed"); switch (pMsg->msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 045c92b5860dd7719caf6e6f2fecee0ecdc6e10e..b4d51e50a426f0b0ef9486c5a1bfde4a5bf33210 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -310,14 +310,21 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName } if (NULL == vgInfo) { - ctgError("no hash range found for hashvalue[%u]", hashValue); + ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo)); + + void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL); + while (pIter1) { + vgInfo = pIter1; + ctgError("valid range:[%u, %u], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId); + pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1); + } + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } *pVgroup = *vgInfo; _return: - CTG_RET(TSDB_CODE_SUCCESS); } @@ -773,7 +780,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); _return: - if (dbInfo) { CTG_UNLOCK(CTG_READ, &dbInfo->lock); taosHashRelease(pCatalog->dbCache.cache, dbInfo); diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 6e1256d857ee0d4accbf67912354e94d1e681df5..378af4c1d10821c123dc4fc16d017df6cd9f3b01 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -115,7 +115,7 @@ typedef struct TFileCacheKey { int32_t nColName; } ICacheKey; -int indexFlushCacheTFile(SIndex* sIdx, void*); +int indexFlushCacheToTFile(SIndex* sIdx, void*); int32_t indexSerialCacheKey(ICacheKey* key, char* buf); diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 544849ff2fab8a1018617db04606e4d4df148502..9db913debfc06f3406aa4d9105709419e5314f1e 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -21,9 +21,8 @@ #include "tskiplist.h" // ----------------- key structure in skiplist --------------------- -/* A data row, the format is like below: - * content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->| - * len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| +/* A data row, the format is like below + * content: |<---colVal---->|<-- version--->|<-- uid--->|<-- colType --->|<--operaType--->| */ #ifdef __cplusplus diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 0bf30d6efabb84b3d060ff6639387b6d8db45c52..d0b8fa4290d14f3363e3b1046b003fdc071393d2 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -384,7 +384,6 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { } } else { taosArrayPush(result, &tv); - // indexError("merge colVal: %s", tv->colVal); } } static void indexDestroyTempResult(SArray* result) { @@ -395,7 +394,7 @@ static void indexDestroyTempResult(SArray* result) { } taosArrayDestroy(result); } -int indexFlushCacheTFile(SIndex* sIdx, void* cache) { +int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 85e4c98276faf76977d88d6afeec7db373f4e89c..8bc3776ed9756771e119fb6b4b6c33ae6c800153 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -45,9 +45,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in return NULL; }; cache->mem = indexInternalCacheCreate(type); - - cache->colName = calloc(1, strlen(colName) + 1); - memcpy(cache->colName, colName, strlen(colName)); + cache->colName = tstrdup(colName); cache->type = type; cache->index = idx; cache->version = 0; @@ -187,8 +185,8 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { break; } else if (cache->imm != NULL) { // TODO: wake up by condition variable - // pthread_mutex_unlock(&cache->mtx); pthread_cond_wait(&cache->finished, &cache->mtx); + // pthread_mutex_unlock(&cache->mtx); // taosMsleep(50); // pthread_mutex_lock(&cache->mtx); } else { @@ -353,7 +351,7 @@ static MemTable* indexInternalCacheCreate(int8_t type) { static void doMergeWork(SSchedMsg* msg) { IndexCache* pCache = msg->ahandle; SIndex* sidx = (SIndex*)pCache->index; - indexFlushCacheTFile(sidx, pCache); + indexFlushCacheToTFile(sidx, pCache); } static bool indexCacheIteratorNext(Iterate* itera) { SSkipListIterator* iter = itera->iter; diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 260c1708cb97a4dbf0cb845505188cdc77fcd506..0763aae85703ee5a4e2a4f88be9bdc6f985ee7a1 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -18,8 +18,6 @@ #include "tutil.h" static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { - // if (ctx->offset + len > ctx->limit) { return -1; } - if (ctx->type == TFile) { assert(len == tfWrite(ctx->file.fd, buf, len)); } else { diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index d48041fbdbb2792d06d3c63a5750bd985fbdc1cb..efe76700896b37219f0eaccb7bcaee358e9223de 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - tdDestroyKVRowBuilder(pKvRowBuilder); return buildInvalidOperationMsg(pMsgBuf, msg1); } } @@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa const char* msg3 = "tag value too long"; const char* msg4 = "illegal value or data overflow"; + int32_t code = 0; + STableMeta* pSuperTableMeta = NULL; + SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); // super table name, create table by using dst @@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SToken* pSTableNameToken = &pCreateTableInfo->stbName; - int32_t code = parserValidateNameToken(pSTableNameToken); + code = parserValidateNameToken(pSTableNameToken); if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SName name = {0}; code = createSName(&name, pSTableNameToken, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _error; } SKVRowBuilder kvRowBuilder = {0}; if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } SArray* pValList = pCreateTableInfo->pTagVals; size_t numOfInputTag = taosArrayGetSize(pValList); - STableMeta* pSuperTableMeta = NULL; code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _error; } assert(pSuperTableMeta != NULL); @@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } bool findColumnIndex = false; @@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pItem->pVar.nLen > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } } else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { @@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true); // check again after the convert since it may be converted from binary to nchar. - if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pSchema->type)) { int16_t len = varDataTLen(tagVal); if (len > pSchema->bytes) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } } if (code != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg4); + code = buildInvalidOperationMsg(pMsgBuf, msg4); + goto _error; } tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); @@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa } else { if (schemaSize != numOfInputTag) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { tdDestroyKVRowBuilder(&kvRowBuilder); - tfree(pSuperTableMeta); - return code; + goto _error; } } SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (row == NULL) { - tfree(pSuperTableMeta); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } tdSortKVRowByColIdx(row); @@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa SName tableName = {0}; code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { - tfree(pSuperTableMeta); - return code; + goto _error; } // Find a appropriate vgroup to accommodate this table , according to the table name SVgroupInfo info = {0}; - catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + code = catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info); tfree(pSuperTableMeta); } *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap); + if (*pBufArray == NULL) { + code = terrno; + goto _error; + } taosHashCleanup(pVgroupHashmap); return TSDB_CODE_SUCCESS; + + _error: + taosHashCleanup(pVgroupHashmap); + tfree(pSuperTableMeta); + terrno = code; + return code; } static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { @@ -612,11 +626,12 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SArray* pColumns, S int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { SArray* pBufArray = NULL; + int32_t code = 0; // it is a sql statement to create a normal table if (pCreateTable->childTableInfo == NULL) { assert(taosArrayGetSize(pCreateTable->colInfo.pColumns) > 0 && pCreateTable->colInfo.pTagColumns == NULL); - int32_t code = doCheckForCreateTable(pCreateTable, pMsgBuf); + code = doCheckForCreateTable(pCreateTable, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -645,7 +660,10 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasic destroyCreateTbReqBatch(&tbatch); } else { // it is a child table, created according to a super table - doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); + code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); + if (code != 0) { + return code; + } } SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 779a2646996e1614701c48e3e11aa861687903eb..a7ec39bfdef719627eb89ec7b14c1a69533c9f58 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -67,8 +67,8 @@ typedef struct SSchTask { int32_t msgLen; // msg length int8_t status; // task status SQueryNodeAddr execAddr; // task actual executed node address - int8_t condidateIdx; // current try condidation index - SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr + int8_t candidateIdx; // current try condidation index + SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 69df5dddeea9cb795658d969ece414700082597a..f5a49e782c5826de399b56c614c3dff13e25d475 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { } static void cleanupTask(SSchTask* pTask) { - taosArrayDestroy(pTask->condidateAddrs); + taosArrayDestroy(pTask->candidateAddrs); } int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { @@ -226,20 +226,20 @@ _return: SCH_RET(code); } -int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { - if (task->condidateAddrs) { +int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { + if (task->candidateAddrs) { return TSDB_CODE_SUCCESS; } - task->condidateIdx = 0; - task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); - if (NULL == task->condidateAddrs) { + task->candidateIdx = 0; + task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == task->candidateAddrs) { qError("taosArrayInit failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (task->plan->execNode.numOfEps > 0) { - if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); - if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -273,19 +273,20 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } - -int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); +int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { + if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { + qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), + pJob->queryId); return TSDB_CODE_SUCCESS; } int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); return TSDB_CODE_SUCCESS; } @@ -583,9 +584,12 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } + int32_t s = taosHashGetSize((*job)->execTasks); + assert(s != 0); + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); + qError("failed to get task, taskId:%"PRIx64" not exist, reqId:0x%"PRIx64, pParam->taskId, (*job)->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -798,7 +802,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SEpSet epSet; - SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx); + SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx); schConvertAddrToEpSet(addr, &epSet); @@ -816,15 +820,16 @@ _return: int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); + SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); - if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); + if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { + SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); + // NOTE: race condition: the task should be put into the hash table before send msg to server SCH_ERR_RET(schPushTaskToExecList(job, task)); + SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); task->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; @@ -975,7 +980,7 @@ _return: } int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { - if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { + if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); }