提交 0cffefc8 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

...@@ -48,10 +48,11 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, ...@@ -48,10 +48,11 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf,
int32_t raftServerStart(SRaftServer *pRaftServer); int32_t raftServerStart(SRaftServer *pRaftServer);
void raftServerClose(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer);
int initFsm(struct raft_fsm *fsm); int initFsm(struct raft_fsm *fsm);
const char* state2String(unsigned short state);
void printRaftConfiguration(struct raft_configuration *c);
void printRaftState(struct raft *r);
#ifdef __cplusplus #ifdef __cplusplus
......
add_executable(raftMain "") add_subdirectory(rebalance_leader)
target_sources(raftMain add_subdirectory(make_cluster)
PRIVATE
"raftMain.c"
"raftServer.c"
)
target_link_libraries(raftMain PUBLIC traft lz4 uv_a)
add_executable(makeCluster "")
target_sources(makeCluster
PRIVATE
"raftMain.c"
"raftServer.c"
"config.c"
"console.c"
"simpleHash.c"
"util.c"
)
target_link_libraries(makeCluster PUBLIC traft lz4 uv_a)
#ifndef TRAFT_COMMON_H
#define TRAFT_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#define COMMAND_LEN 512
#define MAX_CMD_COUNT 10
#define TOKEN_LEN 128
#define MAX_PEERS_COUNT 19
#define HOST_LEN 64
#define ADDRESS_LEN (HOST_LEN * 2)
#define BASE_DIR_LEN 128
#ifdef __cplusplus
}
#endif
#endif
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
void addrToString(const char *host, uint16_t port, char *addr, int len) { snprintf(addr, len, "%s:%hu", host, port); }
void parseAddr(const char *addr, char *host, int len, uint16_t *port) {
char *tmp = (char *)malloc(strlen(addr) + 1);
strcpy(tmp, addr);
char *context;
char *separator = ":";
char *token = strtok_r(tmp, separator, &context);
if (token) {
snprintf(host, len, "%s", token);
}
token = strtok_r(NULL, separator, &context);
if (token) {
sscanf(token, "%hu", port);
}
free(tmp);
}
int parseConf(int argc, char **argv, RaftServerConfig *pConf) {
memset(pConf, 0, sizeof(*pConf));
int option_index, option_value;
option_index = 0;
static struct option long_options[] = {{"help", no_argument, NULL, 'h'},
{"addr", required_argument, NULL, 'a'},
{"dir", required_argument, NULL, 'd'},
{NULL, 0, NULL, 0}};
while ((option_value = getopt_long(argc, argv, "ha:d:", long_options, &option_index)) != -1) {
switch (option_value) {
case 'a': {
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
break;
}
case 'd': {
snprintf(pConf->baseDir, sizeof(pConf->baseDir), "%s", optarg);
break;
}
case 'h': {
return -2;
}
default: { return -2; }
}
}
return 0;
}
void printConf(RaftServerConfig *pConf) {
printf("\n---printConf: \n");
printf("me: [%s:%hu] \n", pConf->me.host, pConf->me.port);
printf("dataDir: [%s] \n\n", pConf->baseDir);
}
#ifndef TRAFT_CONFIG_H
#define TRAFT_CONFIG_H
#ifdef __cplusplus
extern "C" {
#endif
#include <getopt.h>
#include <stdint.h>
#include "common.h"
typedef struct {
char host[HOST_LEN];
uint16_t port;
} Addr;
typedef struct {
Addr me;
char baseDir[BASE_DIR_LEN];
} RaftServerConfig;
void addrToString(const char *host, uint16_t port, char *addr, int len);
void parseAddr(const char *addr, char *host, int len, uint16_t *port);
int parseConf(int argc, char **argv, RaftServerConfig *pConf);
void printConf(RaftServerConfig *pConf);
#ifdef __cplusplus
}
#endif
#endif
#include "console.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "raftServer.h"
#include "util.h"
void printHelp() {
printf("---------------------\n");
printf("help: \n\n");
printf("create a vgroup with 3 replicas: \n");
printf("create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 \n");
printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10002 \n");
printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10001 \n");
printf("\n");
printf("create a vgroup with only one replica: \n");
printf("create vnode voter vid 200 \n");
printf("\n");
printf("add vnode into vgroup: \n");
printf("create vnode spare vid 100 ---- run at 127.0.0.1:10003\n");
printf("join vnode vid 100 addr 127.0.0.1:10003 ---- run at leader of vgroup 100\n");
printf("\n");
printf("run \n");
printf("put 0 key value \n");
printf("get 0 key \n");
printf("---------------------\n");
}
void console(RaftServer *pRaftServer) {
while (1) {
int ret;
char cmdBuf[COMMAND_LEN];
memset(cmdBuf, 0, sizeof(cmdBuf));
printf("(console)> ");
char *retp = fgets(cmdBuf, COMMAND_LEN, stdin);
if (!retp) {
exit(-1);
}
int pos = strlen(cmdBuf);
if (cmdBuf[pos - 1] == '\n') {
cmdBuf[pos - 1] = '\0';
}
if (strncmp(cmdBuf, "", COMMAND_LEN) == 0) {
continue;
}
char cmds[MAX_CMD_COUNT][TOKEN_LEN];
memset(cmds, 0, sizeof(cmds));
int cmdCount;
cmdCount = splitString(cmdBuf, " ", cmds, MAX_CMD_COUNT);
if (strcmp(cmds[0], "create") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[3], "vid") == 0) {
uint16_t vid;
sscanf(cmds[4], "%hu", &vid);
if (strcmp(cmds[2], "voter") == 0) {
char peers[MAX_PEERS_COUNT][ADDRESS_LEN];
memset(peers, 0, sizeof(peers));
uint32_t peersCount = 0;
if (strcmp(cmds[5], "peers") == 0 && cmdCount > 6) {
// create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002
for (int i = 6; i < cmdCount; ++i) {
snprintf(peers[i - 6], ADDRESS_LEN, "%s", cmds[i]);
peersCount++;
}
} else {
// create vnode voter vid 200
}
ret = addRaftVoter(pRaftServer, peers, peersCount, vid);
if (ret == 0) {
printf("create vnode voter ok \n");
} else {
printf("create vnode voter error \n");
}
} else if (strcmp(cmds[2], "spare") == 0) {
ret = addRaftSpare(pRaftServer, vid);
if (ret == 0) {
printf("create vnode spare ok \n");
} else {
printf("create vnode spare error \n");
}
} else {
printHelp();
}
} else if (strcmp(cmds[0], "join") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[2], "vid") == 0 &&
strcmp(cmds[4], "addr") == 0 && cmdCount == 6) {
// join vnode vid 100 addr 127.0.0.1:10004
char * address = cmds[5];
char host[64];
uint16_t port;
parseAddr(address, host, sizeof(host), &port);
uint16_t vid;
sscanf(cmds[3], "%hu", &vid);
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
if (*pp == NULL) {
printf("vid:%hu not found \n", vid);
break;
}
RaftInstance *pRaftInstance = (*pp)->data;
uint64_t destRaftId = encodeRaftId(host, port, vid);
struct raft_change *req = raft_malloc(sizeof(*req));
RaftJoin * pRaftJoin = raft_malloc(sizeof(*pRaftJoin));
pRaftJoin->r = &pRaftInstance->raft;
pRaftJoin->joinId = destRaftId;
req->data = pRaftJoin;
ret = raft_add(&pRaftInstance->raft, req, destRaftId, address, raftChangeAddCb);
if (ret != 0) {
printf("raft_add error: %s \n", raft_errmsg(&pRaftInstance->raft));
}
} else if (strcmp(cmds[0], "dropnode") == 0) {
} else if (strcmp(cmds[0], "state") == 0) {
pRaftServer->raftInstances.print(&pRaftServer->raftInstances);
for (size_t i = 0; i < pRaftServer->raftInstances.length; ++i) {
HashNode *ptr = pRaftServer->raftInstances.table[i];
if (ptr != NULL) {
while (ptr != NULL) {
RaftInstance *pRaftInstance = ptr->data;
printf("instance vid:%hu raftId:%llu \n", ptr->vgroupId, pRaftInstance->raftId);
printRaftState(&pRaftInstance->raft);
printf("\n");
ptr = ptr->next;
}
printf("\n");
}
}
} else if (strcmp(cmds[0], "put") == 0 && cmdCount == 4) {
uint16_t vid;
sscanf(cmds[1], "%hu", &vid);
char * key = cmds[2];
char * value = cmds[3];
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
if (*pp == NULL) {
printf("vid:%hu not found \n", vid);
break;
}
RaftInstance *pRaftInstance = (*pp)->data;
char *raftValue = malloc(TOKEN_LEN * 2 + 3);
snprintf(raftValue, TOKEN_LEN * 2 + 3, "%s--%s", key, value);
putValue(&pRaftInstance->raft, raftValue);
free(raftValue);
} else if (strcmp(cmds[0], "run") == 0) {
pthread_t tidRaftServer;
pthread_create(&tidRaftServer, NULL, startServerFunc, pRaftServer);
} else if (strcmp(cmds[0], "get") == 0 && cmdCount == 3) {
uint16_t vid;
sscanf(cmds[1], "%hu", &vid);
char * key = cmds[2];
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
if (*pp == NULL) {
printf("vid:%hu not found \n", vid);
break;
}
RaftInstance * pRaftInstance = (*pp)->data;
SimpleHash * pKV = pRaftInstance->fsm.data;
SimpleHashNode **ppNode = pKV->find_cstr(pKV, key);
if (*ppNode == NULL) {
printf("key:%s not found \n", key);
} else {
printf("find key:%s value:%s \n", key, (char *)((*ppNode)->data));
}
} else if (strcmp(cmds[0], "transfer") == 0) {
} else if (strcmp(cmds[0], "state") == 0) {
} else if (strcmp(cmds[0], "snapshot") == 0) {
} else if (strcmp(cmds[0], "exit") == 0) {
exit(0);
} else if (strcmp(cmds[0], "quit") == 0) {
exit(0);
} else if (strcmp(cmds[0], "help") == 0) {
printHelp();
} else {
printf("unknown command: %s \n", cmdBuf);
printHelp();
}
/*
printf("cmdBuf: [%s] \n", cmdBuf);
printf("cmdCount : %d \n", cmdCount);
for (int i = 0; i < MAX_CMD_COUNT; ++i) {
printf("cmd%d : %s \n", i, cmds[i]);
}
*/
}
}
#ifndef TRAFT_CONSOLE_H
#define TRAFT_CONSOLE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <getopt.h>
#include <stdint.h>
#include "common.h"
#include "raftServer.h"
void console(RaftServer *pRaftServer);
#ifdef __cplusplus
}
#endif
#endif
#include <assert.h>
#include <getopt.h>
#include <pthread.h>
#include <raft.h>
#include <raft/uv.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "common.h"
#include "config.h"
#include "console.h"
#include "raftServer.h"
#include "simpleHash.h"
#include "util.h"
const char *exe_name;
void *startConsoleFunc(void *param) {
RaftServer *pRaftServer = (RaftServer *)param;
console(pRaftServer);
return NULL;
}
void usage() {
printf("\nusage: \n");
printf("%s --addr=127.0.0.1:10000 --dir=./data \n", exe_name);
printf("\n");
}
RaftServerConfig gConfig;
RaftServer gRaftServer;
int main(int argc, char **argv) {
srand(time(NULL));
int32_t ret;
exe_name = argv[0];
if (argc < 3) {
usage();
exit(-1);
}
ret = parseConf(argc, argv, &gConfig);
if (ret != 0) {
usage();
exit(-1);
}
printConf(&gConfig);
if (!dirOK(gConfig.baseDir)) {
ret = mkdir(gConfig.baseDir, 0775);
if (ret != 0) {
fprintf(stderr, "mkdir error, %s \n", gConfig.baseDir);
exit(-1);
}
}
ret = raftServerInit(&gRaftServer, &gConfig);
if (ret != 0) {
fprintf(stderr, "raftServerInit error \n");
exit(-1);
}
/*
pthread_t tidRaftServer;
pthread_create(&tidRaftServer, NULL, startServerFunc, &gRaftServer);
*/
pthread_t tidConsole;
pthread_create(&tidConsole, NULL, startConsoleFunc, &gRaftServer);
while (1) {
sleep(10);
}
return 0;
}
#include "raftServer.h"
#include <stdlib.h>
#include <unistd.h>
#include "common.h"
#include "simpleHash.h"
#include "util.h"
void *startServerFunc(void *param) {
RaftServer *pRaftServer = (RaftServer *)param;
int32_t r = raftServerStart(pRaftServer);
assert(r == 0);
return NULL;
}
void raftChangeAssignCb(struct raft_change *req, int status) {
struct raft *r = req->data;
if (status != 0) {
printf("raftChangeAssignCb error: %s \n", raft_errmsg(r));
} else {
printf("raftChangeAssignCb ok \n");
}
raft_free(req);
}
void raftChangeAddCb(struct raft_change *req, int status) {
RaftJoin *pRaftJoin = req->data;
if (status != 0) {
printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r));
} else {
struct raft_change *req2 = raft_malloc(sizeof(*req2));
req2->data = pRaftJoin->r;
int ret = raft_assign(pRaftJoin->r, req2, pRaftJoin->joinId, RAFT_VOTER, raftChangeAssignCb);
if (ret != 0) {
printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r));
}
}
raft_free(req->data);
raft_free(req);
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
// get fsm data
SimpleHash *sh = pFsm->data;
// get commit value
char *msg = (char *)buf->base;
printf("fsm apply: [%s] \n", msg);
char arr[2][TOKEN_LEN];
int r = splitString(msg, "--", arr, 2);
assert(r == 2);
// do the value on fsm
sh->insert_cstr(sh, arr[0], arr[1]);
raft_free(buf->base);
return 0;
}
void putValueCb(struct raft_apply *req, int status, void *result) {
struct raft *r = req->data;
if (status != 0) {
printf("putValueCb error: %s \n", raft_errmsg(r));
} else {
printf("putValueCb: %s \n", "ok");
}
raft_free(req);
}
void putValue(struct raft *r, const char *value) {
struct raft_buffer buf;
buf.len = strlen(value) + 1;
buf.base = raft_malloc(buf.len);
snprintf(buf.base, buf.len, "%s", value);
struct raft_apply *req = raft_malloc(sizeof(*req));
req->data = r;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char *)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
const char *state2String(unsigned short state) {
if (state == RAFT_UNAVAILABLE) {
return "RAFT_UNAVAILABLE";
} else if (state == RAFT_FOLLOWER) {
return "RAFT_FOLLOWER";
} else if (state == RAFT_CANDIDATE) {
return "RAFT_CANDIDATE";
} else if (state == RAFT_LEADER) {
return "RAFT_LEADER";
}
return "UNKNOWN_RAFT_STATE";
}
void printRaftConfiguration(struct raft_configuration *c) {
printf("configuration: \n");
for (int i = 0; i < c->n; ++i) {
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
}
}
void printRaftState(struct raft *r) {
printf("----Raft State: -----------\n");
printf("mem_addr: %p \n", r);
printf("my_id: %llu \n", r->id);
printf("address: %s \n", r->address);
printf("current_term: %llu \n", r->current_term);
printf("voted_for: %llu \n", r->voted_for);
printf("role: %s \n", state2String(r->state));
printf("commit_index: %llu \n", r->commit_index);
printf("last_applied: %llu \n", r->last_applied);
printf("last_stored: %llu \n", r->last_stored);
printf("configuration_index: %llu \n", r->configuration_index);
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
printRaftConfiguration(&r->configuration);
printf("----------------------------\n");
}
int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid) {
int ret;
RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance));
assert(pRaftInstance != NULL);
// init raftId
pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
// init dir
snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir,
pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId);
if (!dirOK(pRaftInstance->dir)) {
ret = mkdir(pRaftInstance->dir, 0775);
if (ret != 0) {
fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir);
assert(0);
}
}
// init fsm
pRaftInstance->fsm.data = newSimpleHash(2);
pRaftInstance->fsm.apply = fsmApplyCb;
// init io
ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport);
if (ret != 0) {
fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft
ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId,
pRaftServer->address);
if (ret != 0) {
fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft_configuration
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_VOTER);
for (int i = 0; i < peersCount; ++i) {
char * peerAddress = peers[i];
char host[64];
uint16_t port;
parseAddr(peerAddress, host, sizeof(host), &port);
uint64_t raftId = encodeRaftId(host, port, vid);
raft_configuration_add(&conf, raftId, peers[i], RAFT_VOTER);
}
raft_bootstrap(&pRaftInstance->raft, &conf);
// start raft
ret = raft_start(&pRaftInstance->raft);
if (ret != 0) {
fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// add raft instance into raft server
pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance);
return 0;
}
int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid) {
int ret;
RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance));
assert(pRaftInstance != NULL);
// init raftId
pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
// init dir
snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir,
pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId);
ret = mkdir(pRaftInstance->dir, 0775);
if (ret != 0) {
fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir);
assert(0);
}
// init fsm
pRaftInstance->fsm.data = newSimpleHash(2);
pRaftInstance->fsm.apply = fsmApplyCb;
// init io
ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport);
if (ret != 0) {
fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft
ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId,
pRaftServer->address);
if (ret != 0) {
fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// init raft_configuration
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_SPARE);
raft_bootstrap(&pRaftInstance->raft, &conf);
// start raft
ret = raft_start(&pRaftInstance->raft);
if (ret != 0) {
fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft));
assert(0);
}
// add raft instance into raft server
pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance);
return 0;
}
int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf) {
int ret;
// init host, port, address, dir
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
pRaftServer->port = pConf->me.port;
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
snprintf(pRaftServer->baseDir, sizeof(pRaftServer->baseDir), "%s", pConf->baseDir);
// init loop
ret = uv_loop_init(&pRaftServer->loop);
if (ret != 0) {
fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret));
assert(0);
}
// init network
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
if (ret != 0) {
fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret);
assert(0);
}
// init raft instance container
initIdHash(&pRaftServer->raftInstances, 2);
return 0;
}
int32_t raftServerStart(RaftServer *pRaftServer) {
// start loop
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
return 0;
}
void raftServerStop(RaftServer *pRaftServer) {}
#ifndef TDENGINE_RAFT_SERVER_H
#define TDENGINE_RAFT_SERVER_H
#ifdef __cplusplus
extern "C" {
#endif
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <string.h>
#include "common.h"
#include "config.h"
#include "raft.h"
#include "raft/uv.h"
#include "simpleHash.h"
typedef struct RaftJoin {
struct raft *r;
raft_id joinId;
} RaftJoin;
typedef struct {
raft_id raftId;
char dir[BASE_DIR_LEN * 2];
struct raft_fsm fsm;
struct raft_io io;
struct raft raft;
} RaftInstance;
typedef struct {
char host[HOST_LEN];
uint16_t port;
char address[ADDRESS_LEN]; /* Raft instance address */
char baseDir[BASE_DIR_LEN]; /* Raft instance address */
struct uv_loop_s loop; /* UV loop */
struct raft_uv_transport transport; /* UV I/O backend transport */
IdHash raftInstances; /* multi raft instances. traft use IdHash to manager multi vgroup inside, here we can use IdHash
too. */
} RaftServer;
void * startServerFunc(void *param);
int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid);
int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid);
int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf);
int32_t raftServerStart(RaftServer *pRaftServer);
void raftServerStop(RaftServer *pRaftServer);
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result);
void putValueCb(struct raft_apply *req, int status, void *result);
void putValue(struct raft *r, const char *value);
void raftChangeAddCb(struct raft_change *req, int status);
const char *state2String(unsigned short state);
void printRaftConfiguration(struct raft_configuration *c);
void printRaftState(struct raft *r);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RAFT_SERVER_H
#include "simpleHash.h"
uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed) {
// Similar to murmur hash
const uint32_t m = 0xc6a4a793;
const uint32_t r = 24;
const char* limit = data + n;
uint32_t h = seed ^ (n * m);
// Pick up four bytes at a time
while (data + 4 <= limit) {
// uint32_t w = DecodeFixed32(data);
uint32_t w;
memcpy(&w, data, 4);
data += 4;
h += w;
h *= m;
h ^= (h >> 16);
}
// Pick up remaining bytes
switch (limit - data) {
case 3:
h += (unsigned char)(data[2]) << 16;
do {
} while (0);
case 2:
h += (unsigned char)(data[1]) << 8;
do {
} while (0);
case 1:
h += (unsigned char)(data[0]);
h *= m;
h ^= (h >> r);
break;
}
return h;
}
int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data) {
return insertSimpleHash(ths, key, strlen(key) + 1, data, strlen(data) + 1);
}
int removeCStrSimpleHash(struct SimpleHash* ths, char* key) { return removeSimpleHash(ths, key, strlen(key) + 1); }
SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key) {
return findSimpleHash(ths, key, strlen(key) + 1);
}
int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen) {
SimpleHashNode** pp = ths->find(ths, key, keyLen);
if (*pp != NULL) {
fprintf(stderr, "insertSimpleHash, already has key \n");
return -1;
}
SimpleHashNode* node = malloc(sizeof(*node));
node->hashCode = ths->hashFunc(key, keyLen);
node->key = malloc(keyLen);
node->keyLen = keyLen;
memcpy(node->key, key, keyLen);
node->data = malloc(dataLen);
node->dataLen = dataLen;
memcpy(node->data, data, dataLen);
node->next = NULL;
// printf("insertSimpleHash: <%s, %ld, %s, %ld, %u> \n", node->key, node->keyLen, node->data, node->dataLen,
// node->hashCode);
size_t index = node->hashCode & (ths->length - 1);
SimpleHashNode* ptr = ths->table[index];
if (ptr != NULL) {
node->next = ptr;
ths->table[index] = node;
} else {
ths->table[index] = node;
}
ths->elems++;
if (ths->elems > 2 * ths->length) {
ths->resize(ths);
}
return 0;
}
int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) {
SimpleHashNode** pp = ths->find(ths, key, keyLen);
if (*pp == NULL) {
fprintf(stderr, "removeSimpleHash, key not exist \n");
return -1;
}
SimpleHashNode* del = *pp;
*pp = del->next;
free(del->key);
free(del->data);
free(del);
ths->elems--;
return 0;
}
SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) {
uint32_t hashCode = ths->hashFunc(key, keyLen);
// size_t index = hashCode % ths->length;
size_t index = hashCode & (ths->length - 1);
// printf("findSimpleHash: %s %ld %u \n", key, keyLen, hashCode);
SimpleHashNode** pp = &(ths->table[index]);
while (*pp != NULL && ((*pp)->hashCode != hashCode || memcmp(key, (*pp)->key, keyLen) != 0)) {
pp = &((*pp)->next);
}
return pp;
}
void printCStrSimpleHash(struct SimpleHash* ths) {
printf("\n--- printCStrSimpleHash: elems:%d length:%d \n", ths->elems, ths->length);
for (size_t i = 0; i < ths->length; ++i) {
SimpleHashNode* ptr = ths->table[i];
if (ptr != NULL) {
printf("%zu: ", i);
while (ptr != NULL) {
printf("<%u, %s, %ld, %s, %ld> ", ptr->hashCode, (char*)ptr->key, ptr->keyLen, (char*)ptr->data, ptr->dataLen);
ptr = ptr->next;
}
printf("\n");
}
}
printf("---------------\n");
}
void destroySimpleHash(struct SimpleHash* ths) {
for (size_t i = 0; i < ths->length; ++i) {
SimpleHashNode* ptr = ths->table[i];
while (ptr != NULL) {
SimpleHashNode* tmp = ptr;
ptr = ptr->next;
free(tmp->key);
free(tmp->data);
free(tmp);
}
}
ths->length = 0;
ths->elems = 0;
free(ths->table);
free(ths);
}
void resizeSimpleHash(struct SimpleHash* ths) {
uint32_t new_length = ths->length;
while (new_length < ths->elems) {
new_length *= 2;
}
printf("resizeSimpleHash: %p from %u to %u \n", ths, ths->length, new_length);
SimpleHashNode** new_table = malloc(new_length * sizeof(SimpleHashNode*));
memset(new_table, 0, new_length * sizeof(SimpleHashNode*));
uint32_t count = 0;
for (uint32_t i = 0; i < ths->length; i++) {
if (ths->table[i] == NULL) {
continue;
}
SimpleHashNode* it = ths->table[i];
while (it != NULL) {
SimpleHashNode* move_node = it;
it = it->next;
// move move_node
move_node->next = NULL;
size_t index = move_node->hashCode & (new_length - 1);
SimpleHashNode* ptr = new_table[index];
if (ptr != NULL) {
move_node->next = ptr;
new_table[index] = move_node;
} else {
new_table[index] = move_node;
}
count++;
}
}
assert(ths->elems == count);
free(ths->table);
ths->table = new_table;
ths->length = new_length;
}
uint32_t simpleHashFunc(const char* key, size_t keyLen) { return mySimpleHash(key, keyLen, 1); }
struct SimpleHash* newSimpleHash(size_t length) {
struct SimpleHash* ths = malloc(sizeof(*ths));
ths->length = length;
ths->elems = 0;
ths->table = malloc(length * sizeof(SimpleHashNode*));
memset(ths->table, 0, length * sizeof(SimpleHashNode*));
ths->insert = insertSimpleHash;
ths->remove = removeSimpleHash;
ths->find = findSimpleHash;
ths->insert_cstr = insertCStrSimpleHash;
ths->remove_cstr = removeCStrSimpleHash;
ths->find_cstr = findCStrSimpleHash;
ths->print_cstr = printCStrSimpleHash;
ths->destroy = destroySimpleHash;
ths->resize = resizeSimpleHash;
ths->hashFunc = simpleHashFunc;
}
#ifndef __SIMPLE_HASH_H__
#define __SIMPLE_HASH_H__
#include <assert.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed);
typedef struct SimpleHashNode {
uint32_t hashCode;
void* key;
size_t keyLen;
void* data;
size_t dataLen;
struct SimpleHashNode* next;
} SimpleHashNode;
typedef struct SimpleHash {
// public:
int (*insert)(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen);
int (*remove)(struct SimpleHash* ths, char* key, size_t keyLen);
SimpleHashNode** (*find)(struct SimpleHash* ths, char* key, size_t keyLen);
// wrapper
int (*insert_cstr)(struct SimpleHash* ths, char* key, char* data);
int (*remove_cstr)(struct SimpleHash* ths, char* key);
SimpleHashNode** (*find_cstr)(struct SimpleHash* ths, char* key);
void (*print_cstr)(struct SimpleHash* ths);
void (*destroy)(struct SimpleHash* ths);
uint32_t length;
uint32_t elems;
// private:
void (*resize)(struct SimpleHash* ths);
uint32_t (*hashFunc)(const char* key, size_t keyLen);
SimpleHashNode** table;
} SimpleHash;
int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data);
int removeCStrSimpleHash(struct SimpleHash* ths, char* key);
SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key);
void printCStrSimpleHash(struct SimpleHash* ths);
int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen);
int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen);
SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen);
void destroySimpleHash(struct SimpleHash* ths);
void resizeSimpleHash(struct SimpleHash* ths);
uint32_t simpleHashFunc(const char* key, size_t keyLen);
struct SimpleHash* newSimpleHash(size_t length);
#endif
#include "util.h"
#include <dirent.h>
#include <stdlib.h>
#include <string.h>
int dirOK(const char *path) {
DIR *dir = opendir(path);
if (dir != NULL) {
closedir(dir);
return 1;
} else {
return 0;
}
}
int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr) {
if (n_arr <= 0) {
return -1;
}
char *tmp = (char *)malloc(strlen(str) + 1);
strcpy(tmp, str);
char *context;
int n = 0;
char *token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
strncpy(arr[n], token, TOKEN_LEN);
n++;
while (1) {
token = strtok_r(NULL, separator, &context);
if (!token || n >= n_arr) {
goto ret;
}
strncpy(arr[n], token, TOKEN_LEN);
n++;
}
ret:
free(tmp);
return n;
}
#ifndef TRAFT_UTIL_H
#define TRAFT_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "common.h"
int dirOK(const char *path);
int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr);
#ifdef __cplusplus
}
#endif
#endif
add_executable(rebalanceLeader "")
target_sources(rebalanceLeader
PRIVATE
"raftMain.c"
"raftServer.c"
)
target_link_libraries(rebalanceLeader PUBLIC traft lz4 uv_a)
#!/bin/bash
rm -rf 127.0.0.1*
rm -rf ./data
...@@ -60,9 +60,9 @@ void raftTransferCb(struct raft_transfer *req) { ...@@ -60,9 +60,9 @@ void raftTransferCb(struct raft_transfer *req) {
SRaftServer *pRaftServer = req->data; SRaftServer *pRaftServer = req->data;
raft_free(req); raft_free(req);
printf("raftTransferCb: \n"); //printf("raftTransferCb: \n");
updateLeaderStates(pRaftServer); updateLeaderStates(pRaftServer);
printLeaderCount(); //printLeaderCount();
int myLeaderCount; int myLeaderCount;
for (int i = 0; i < NODE_COUNT; ++i) { for (int i = 0; i < NODE_COUNT; ++i) {
...@@ -71,12 +71,13 @@ void raftTransferCb(struct raft_transfer *req) { ...@@ -71,12 +71,13 @@ void raftTransferCb(struct raft_transfer *req) {
} }
} }
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); //printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
struct raft *r; struct raft *r;
for (int j = 0; j < pRaftServer->instanceCount; ++j) { for (int j = 0; j < pRaftServer->instanceCount; ++j) {
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
r = &pRaftServer->instance[j].raft; r = &pRaftServer->instance[j].raft;
break;
} }
} }
...@@ -87,17 +88,25 @@ void raftTransferCb(struct raft_transfer *req) { ...@@ -87,17 +88,25 @@ void raftTransferCb(struct raft_transfer *req) {
int minIndex = -1; int minIndex = -1;
int minLeaderCount = myLeaderCount; int minLeaderCount = myLeaderCount;
for (int j = 0; j < NODE_COUNT; ++j) { for (int j = 0; j < NODE_COUNT; ++j) {
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) {
continue;
}
if (leaderStates[j].leaderCount <= minLeaderCount) { if (leaderStates[j].leaderCount <= minLeaderCount) {
minLeaderCount = leaderStates[j].leaderCount;
minIndex = j; minIndex = j;
} }
} }
char myHost[48]; char myHost[48];
uint16_t myPort; uint16_t myPort;
uint16_t myVid; uint16_t myVid;
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
//printf("raftTransferCb transfer leader: vid[%u] choose: index:%d, leaderStates[%d].address:%s, leaderStates[%d].leaderCount:%d \n", minIndex, minIndex, leaderStates[minIndex].address, minIndex, leaderStates[minIndex].leaderCount);
char *destAddress = leaderStates[minIndex].address; char *destAddress = leaderStates[minIndex].address;
char tokens[MAX_PEERS][MAX_TOKEN_LEN]; char tokens[MAX_PEERS][MAX_TOKEN_LEN];
...@@ -106,6 +115,9 @@ void raftTransferCb(struct raft_transfer *req) { ...@@ -106,6 +115,9 @@ void raftTransferCb(struct raft_transfer *req) {
uint16_t destPort = atoi(tokens[1]); uint16_t destPort = atoi(tokens[1]);
destRaftId = encodeRaftId(destHost, destPort, myVid); destRaftId = encodeRaftId(destHost, destPort, myVid);
printf("\nraftTransferCb transfer leader: vgroupId:%u from:%s:%u --> to:%s:%u ", myVid, myHost, myPort, destHost, destPort);
fflush(stdout);
raft_transfer(r, transfer, destRaftId, raftTransferCb); raft_transfer(r, transfer, destRaftId, raftTransferCb);
} }
...@@ -252,7 +264,6 @@ const char* state2String(unsigned short state) { ...@@ -252,7 +264,6 @@ const char* state2String(unsigned short state) {
void printRaftState2(struct raft *r) { void printRaftState2(struct raft *r) {
char leaderAddress[128]; char leaderAddress[128];
memset(leaderAddress, 0, sizeof(leaderAddress)); memset(leaderAddress, 0, sizeof(leaderAddress));
...@@ -350,6 +361,7 @@ void console(SRaftServer *pRaftServer) { ...@@ -350,6 +361,7 @@ void console(SRaftServer *pRaftServer) {
while (1) { while (1) {
char cmd_buf[COMMAND_LEN]; char cmd_buf[COMMAND_LEN];
memset(cmd_buf, 0, sizeof(cmd_buf)); memset(cmd_buf, 0, sizeof(cmd_buf));
printf("(console)> ");
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
if (!ret) { if (!ret) {
exit(-1); exit(-1);
...@@ -403,7 +415,10 @@ void console(SRaftServer *pRaftServer) { ...@@ -403,7 +415,10 @@ void console(SRaftServer *pRaftServer) {
} else if (strcmp(cmd, "dropnode") == 0) { } else if (strcmp(cmd, "dropnode") == 0) {
printf("not support \n"); printf("not support \n");
} else if (strcmp(cmd, "rebalance") == 0) { } else if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
exit(0);
} else if (strcmp(cmd, "rebalance") == 0 && strcmp(param1, "leader") == 0) {
/* /*
updateLeaderStates(pRaftServer); updateLeaderStates(pRaftServer);
...@@ -511,10 +526,14 @@ void console(SRaftServer *pRaftServer) { ...@@ -511,10 +526,14 @@ void console(SRaftServer *pRaftServer) {
printRaftState(&pRaftServer->instance[i].raft); printRaftState(&pRaftServer->instance[i].raft);
} }
} else if (strcmp(cmd, "state2") == 0) { } else if (strcmp(cmd, "leader") == 0 && strcmp(param1, "state") == 0) {
updateLeaderStates(pRaftServer);
printf("\n--------------------------------------------\n");
printLeaderCount();
for (int i = 0; i < pRaftServer->instanceCount; ++i) { for (int i = 0; i < pRaftServer->instanceCount; ++i) {
printRaftState2(&pRaftServer->instance[i].raft); printRaftState2(&pRaftServer->instance[i].raft);
} }
printf("--------------------------------------------\n");
} else if (strcmp(cmd, "snapshot") == 0) { } else if (strcmp(cmd, "snapshot") == 0) {
printf("not support \n"); printf("not support \n");
......
...@@ -3,32 +3,34 @@ ...@@ -3,32 +3,34 @@
#include "common.h" #include "common.h"
#include "raftServer.h" #include "raftServer.h"
char *keys; //char *keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);;
char *values; //char *values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
char keys[MAX_KV_LEN][MAX_RECORD_COUNT];
char values[MAX_KV_LEN][MAX_RECORD_COUNT];
int writeIndex = 0;
void initStore() { void initStore() {
keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
writeIndex = 0;
} }
void destroyStore() { void destroyStore() {
free(keys); //free(keys);
free(values); //free(values);
} }
void putKV(const char *key, const char *value) { void putKV(const char *key, const char *value) {
if (writeIndex < MAX_RECORD_COUNT) { if (writeIndex < MAX_RECORD_COUNT) {
strncpy(&keys[writeIndex], key, MAX_KV_LEN); strncpy(keys[writeIndex], key, MAX_KV_LEN);
strncpy(&values[writeIndex], value, MAX_KV_LEN); strncpy(values[writeIndex], value, MAX_KV_LEN);
writeIndex++; writeIndex++;
} }
} }
char *getKV(const char *key) { char *getKV(const char *key) {
for (int i = 0; i < MAX_RECORD_COUNT; ++i) { for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
if (strcmp(&keys[i], key) == 0) { if (strcmp(keys[i], key) == 0) {
return &values[i]; return values[i];
} }
} }
return NULL; return NULL;
......
...@@ -15,11 +15,13 @@ extern "C" { ...@@ -15,11 +15,13 @@ extern "C" {
// simulate a db store, just for test // simulate a db store, just for test
#define MAX_KV_LEN 100 #define MAX_KV_LEN 20
#define MAX_RECORD_COUNT 500 #define MAX_RECORD_COUNT 16
char *keys;
char *values;
int writeIndex; //char *keys;
//char *values;
//int writeIndex;
void initStore(); void initStore();
void destroyStore(); void destroyStore();
......
...@@ -152,7 +152,7 @@ TEST(testCase, create_db_Test) { ...@@ -152,7 +152,7 @@ TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc1"); TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
} }
...@@ -254,7 +254,7 @@ TEST(testCase, use_db_test) { ...@@ -254,7 +254,7 @@ TEST(testCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc1"); TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
} }
...@@ -505,15 +505,17 @@ TEST(testCase, create_multiple_tables) { ...@@ -505,15 +505,17 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
// for(int32_t i = 0; i < 1000; ++i) { for(int32_t i = 0; i < 200000; ++i) {
// char sql[512] = {0}; char sql[512] = {0};
// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
// TAOS_RES* pres = taos_query(pConn, sql); TAOS_RES* pres = taos_query(pConn, sql);
// if (taos_errno(pres) != 0) { if (taos_errno(pres) != 0) {
// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
// } }
// taos_free_result(pres);
// } printf("%d\n", i);
taos_free_result(pres);
}
taos_close(pConn); taos_close(pConn);
} }
......
...@@ -22,12 +22,12 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); ...@@ -22,12 +22,12 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("query message is processed"); vTrace("query message is processed");
return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("fetch message is processed"); vTrace("fetch message is processed");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
......
...@@ -310,14 +310,21 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName ...@@ -310,14 +310,21 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
} }
if (NULL == vgInfo) { if (NULL == vgInfo) {
ctgError("no hash range found for hashvalue[%u]", hashValue); ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo));
void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL);
while (pIter1) {
vgInfo = pIter1;
ctgError("valid range:[%u, %u], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId);
pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1);
}
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
*pVgroup = *vgInfo; *pVgroup = *vgInfo;
_return: _return:
CTG_RET(TSDB_CODE_SUCCESS); CTG_RET(TSDB_CODE_SUCCESS);
} }
...@@ -773,7 +780,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, ...@@ -773,7 +780,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup));
_return: _return:
if (dbInfo) { if (dbInfo) {
CTG_UNLOCK(CTG_READ, &dbInfo->lock); CTG_UNLOCK(CTG_READ, &dbInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, dbInfo); taosHashRelease(pCatalog->dbCache.cache, dbInfo);
......
...@@ -115,7 +115,7 @@ typedef struct TFileCacheKey { ...@@ -115,7 +115,7 @@ typedef struct TFileCacheKey {
int32_t nColName; int32_t nColName;
} ICacheKey; } ICacheKey;
int indexFlushCacheTFile(SIndex* sIdx, void*); int indexFlushCacheToTFile(SIndex* sIdx, void*);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf); int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
......
...@@ -21,9 +21,8 @@ ...@@ -21,9 +21,8 @@
#include "tskiplist.h" #include "tskiplist.h"
// ----------------- key structure in skiplist --------------------- // ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below: /* A data row, the format is like below
* content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->| * content: |<---colVal---->|<-- version--->|<-- uid--->|<-- colType --->|<--operaType--->|
* len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/ */
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -384,7 +384,6 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { ...@@ -384,7 +384,6 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
} }
} else { } else {
taosArrayPush(result, &tv); taosArrayPush(result, &tv);
// indexError("merge colVal: %s", tv->colVal);
} }
} }
static void indexDestroyTempResult(SArray* result) { static void indexDestroyTempResult(SArray* result) {
...@@ -395,7 +394,7 @@ static void indexDestroyTempResult(SArray* result) { ...@@ -395,7 +394,7 @@ static void indexDestroyTempResult(SArray* result) {
} }
taosArrayDestroy(result); taosArrayDestroy(result);
} }
int indexFlushCacheTFile(SIndex* sIdx, void* cache) { int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; } if (sIdx == NULL) { return -1; }
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
......
...@@ -45,9 +45,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -45,9 +45,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
return NULL; return NULL;
}; };
cache->mem = indexInternalCacheCreate(type); cache->mem = indexInternalCacheCreate(type);
cache->colName = tstrdup(colName);
cache->colName = calloc(1, strlen(colName) + 1);
memcpy(cache->colName, colName, strlen(colName));
cache->type = type; cache->type = type;
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;
...@@ -187,8 +185,8 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -187,8 +185,8 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
break; break;
} else if (cache->imm != NULL) { } else if (cache->imm != NULL) {
// TODO: wake up by condition variable // TODO: wake up by condition variable
// pthread_mutex_unlock(&cache->mtx);
pthread_cond_wait(&cache->finished, &cache->mtx); pthread_cond_wait(&cache->finished, &cache->mtx);
// pthread_mutex_unlock(&cache->mtx);
// taosMsleep(50); // taosMsleep(50);
// pthread_mutex_lock(&cache->mtx); // pthread_mutex_lock(&cache->mtx);
} else { } else {
...@@ -353,7 +351,7 @@ static MemTable* indexInternalCacheCreate(int8_t type) { ...@@ -353,7 +351,7 @@ static MemTable* indexInternalCacheCreate(int8_t type) {
static void doMergeWork(SSchedMsg* msg) { static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle; IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index; SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache); indexFlushCacheToTFile(sidx, pCache);
} }
static bool indexCacheIteratorNext(Iterate* itera) { static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter; SSkipListIterator* iter = itera->iter;
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
#include "tutil.h" #include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
// if (ctx->offset + len > ctx->limit) { return -1; }
if (ctx->type == TFile) { if (ctx->type == TFile) {
assert(len == tfWrite(ctx->file.fd, buf, len)); assert(len == tfWrite(ctx->file.fd, buf, len));
} else { } else {
......
...@@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT ...@@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf); code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(pKvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
} }
...@@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow"; const char* msg4 = "illegal value or data overflow";
int32_t code = 0;
STableMeta* pSuperTableMeta = NULL;
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// super table name, create table by using dst // super table name, create table by using dst
...@@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j);
SToken* pSTableNameToken = &pCreateTableInfo->stbName; SToken* pSTableNameToken = &pCreateTableInfo->stbName;
int32_t code = parserValidateNameToken(pSTableNameToken); code = parserValidateNameToken(pSTableNameToken);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SName name = {0}; SName name = {0};
code = createSName(&name, pSTableNameToken, pCtx, pMsgBuf); code = createSName(&name, pSTableNameToken, pCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; goto _error;
} }
SKVRowBuilder kvRowBuilder = {0}; SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
} }
SArray* pValList = pCreateTableInfo->pTagVals; SArray* pValList = pCreateTableInfo->pTagVals;
size_t numOfInputTag = taosArrayGetSize(pValList); size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL;
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; goto _error;
} }
assert(pSuperTableMeta != NULL); assert(pSuperTableMeta != NULL);
...@@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) { if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta); code = buildInvalidOperationMsg(pMsgBuf, msg2);
return buildInvalidOperationMsg(pMsgBuf, msg2); goto _error;
} }
bool findColumnIndex = false; bool findColumnIndex = false;
...@@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
if (pItem->pVar.nLen > pSchema->bytes) { if (pItem->pVar.nLen > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta); code = buildInvalidOperationMsg(pMsgBuf, msg3);
return buildInvalidOperationMsg(pMsgBuf, msg3); goto _error;
} }
} else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { } else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
...@@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true); code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
// check again after the convert since it may be converted from binary to nchar. // check again after the convert since it may be converted from binary to nchar.
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pSchema->type)) {
int16_t len = varDataTLen(tagVal); int16_t len = varDataTLen(tagVal);
if (len > pSchema->bytes) { if (len > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta); code = buildInvalidOperationMsg(pMsgBuf, msg3);
return buildInvalidOperationMsg(pMsgBuf, msg3); goto _error;
} }
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta); code = buildInvalidOperationMsg(pMsgBuf, msg4);
return buildInvalidOperationMsg(pMsgBuf, msg4); goto _error;
} }
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
...@@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
} else { } else {
if (schemaSize != numOfInputTag) { if (schemaSize != numOfInputTag) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta); code = buildInvalidOperationMsg(pMsgBuf, msg2);
return buildInvalidOperationMsg(pMsgBuf, msg2); goto _error;
} }
code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf); code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta); goto _error;
return code;
} }
} }
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) { if (row == NULL) {
tfree(pSuperTableMeta); code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY; goto _error;
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
...@@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa ...@@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
SName tableName = {0}; SName tableName = {0};
code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf); code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tfree(pSuperTableMeta); goto _error;
return code;
} }
// Find a appropriate vgroup to accommodate this table , according to the table name // Find a appropriate vgroup to accommodate this table , according to the table name
SVgroupInfo info = {0}; SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); code = catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info); addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info);
tfree(pSuperTableMeta); tfree(pSuperTableMeta);
} }
*pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap); *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap);
if (*pBufArray == NULL) {
code = terrno;
goto _error;
}
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error:
taosHashCleanup(pVgroupHashmap);
tfree(pSuperTableMeta);
terrno = code;
return code;
} }
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
...@@ -612,11 +626,12 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SArray* pColumns, S ...@@ -612,11 +626,12 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SArray* pColumns, S
int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
SArray* pBufArray = NULL; SArray* pBufArray = NULL;
int32_t code = 0;
// it is a sql statement to create a normal table // it is a sql statement to create a normal table
if (pCreateTable->childTableInfo == NULL) { if (pCreateTable->childTableInfo == NULL) {
assert(taosArrayGetSize(pCreateTable->colInfo.pColumns) > 0 && pCreateTable->colInfo.pTagColumns == NULL); assert(taosArrayGetSize(pCreateTable->colInfo.pColumns) > 0 && pCreateTable->colInfo.pTagColumns == NULL);
int32_t code = doCheckForCreateTable(pCreateTable, pMsgBuf); code = doCheckForCreateTable(pCreateTable, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -645,7 +660,10 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasic ...@@ -645,7 +660,10 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasic
destroyCreateTbReqBatch(&tbatch); destroyCreateTbReqBatch(&tbatch);
} else { // it is a child table, created according to a super table } else { // it is a child table, created according to a super table
doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray);
if (code != 0) {
return code;
}
} }
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
......
...@@ -67,8 +67,8 @@ typedef struct SSchTask { ...@@ -67,8 +67,8 @@ typedef struct SSchTask {
int32_t msgLen; // msg length int32_t msgLen; // msg length
int8_t status; // task status int8_t status; // task status
SQueryNodeAddr execAddr; // task actual executed node address SQueryNodeAddr execAddr; // task actual executed node address
int8_t condidateIdx; // current try condidation index int8_t candidateIdx; // current try condidation index
SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SQueryProfileSummary summary; // task execution summary SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
......
...@@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { ...@@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
} }
static void cleanupTask(SSchTask* pTask) { static void cleanupTask(SSchTask* pTask) {
taosArrayDestroy(pTask->condidateAddrs); taosArrayDestroy(pTask->candidateAddrs);
} }
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) {
...@@ -226,20 +226,20 @@ _return: ...@@ -226,20 +226,20 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
if (task->condidateAddrs) { if (task->candidateAddrs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
task->condidateIdx = 0; task->candidateIdx = 0;
task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == task->condidateAddrs) { if (NULL == task->candidateAddrs) {
qError("taosArrayInit failed"); qError("taosArrayInit failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (task->plan->execNode.numOfEps > 0) { if (task->plan->execNode.numOfEps > 0) {
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
qError("taosArrayPush failed"); qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { ...@@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
qError("taosArrayPush failed"); qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -273,19 +273,20 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { ...@@ -273,19 +273,20 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId);
qError("taosHashPut failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks),
pJob->queryId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -583,9 +584,12 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -583,9 +584,12 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
int32_t s = taosHashGetSize((*job)->execTasks);
assert(s != 0);
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) { if (NULL == task || NULL == (*task)) {
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); qError("failed to get task, taskId:%"PRIx64" not exist, reqId:0x%"PRIx64, pParam->taskId, (*job)->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
...@@ -798,7 +802,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -798,7 +802,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
} }
SEpSet epSet; SEpSet epSet;
SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx); SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx);
schConvertAddrToEpSet(addr, &epSet); schConvertAddrToEpSet(addr, &epSet);
...@@ -816,15 +820,16 @@ _return: ...@@ -816,15 +820,16 @@ _return:
int32_t schLaunchTask(SSchJob *job, SSchTask *task) { int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SSubplan *plan = task->plan; SSubplan *plan = task->plan;
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) {
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); // NOTE: race condition: the task should be put into the hash table before send msg to server
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
task->status = JOB_TASK_STATUS_EXECUTING; task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -975,7 +980,7 @@ _return: ...@@ -975,7 +980,7 @@ _return:
} }
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册