diff --git a/src/kit/CMakeLists.txt b/src/kit/CMakeLists.txt index 66e8cf73988ab25db7544b9a52215d2279630c63..df3ce1000197b1add7f6454de7ed13c652a4d50d 100644 --- a/src/kit/CMakeLists.txt +++ b/src/kit/CMakeLists.txt @@ -4,3 +4,4 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(shell) ADD_SUBDIRECTORY(taosdemo) ADD_SUBDIRECTORY(taosdump) +ADD_SUBDIRECTORY(taosmigrate) \ No newline at end of file diff --git a/src/kit/taosmigrate/.gitignore b/src/kit/taosmigrate/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..77c52b2ee2d00da281cc6ddec32d4003c9b90c2b --- /dev/null +++ b/src/kit/taosmigrate/.gitignore @@ -0,0 +1,68 @@ +build/ +.vscode/ +.idea/ +cmake-build-debug/ +cscope.out +.DS_Store +debug/ +release/ +target/ +debs/ +rpms/ +mac/ +*.pyc +.mypy_cache +*.tmp +*.swp +src/connector/nodejs/node_modules/ +src/connector/nodejs/out/ +tests/test/ +tests/taoshebei/ +tests/taoscsv/ +tests/taosdalipu/ +tests/jenkins/ +tests/hdfs/ +*.iml +*.class +nmake/ +sln/ +hdfs/ +c/ +taoshebei/ +taosdalipu/ +Target/ +*.failed +*.sql +sim/ +psim/ +pysim/ +*.out +*DS_Store + +# Doxygen Generated files +html/ +/.vs +/CMakeFiles/3.10.2 +/CMakeCache.txt +/Makefile +/*.cmake +/src/cq/test/CMakeFiles/cqtest.dir/*.cmake +*.cmake +/src/cq/test/CMakeFiles/cqtest.dir/*.make +*.make +link.txt +*.internal +*.includecache +*.marks +Makefile +CMakeError.log +*.log +/CMakeFiles/CMakeRuleHashes.txt +/CMakeFiles/Makefile2 +/CMakeFiles/TargetDirectories.txt +/CMakeFiles/cmake.check_cache +/out/isenseconfig/WSL-Clang-Debug +/out/isenseconfig/WSL-GCC-Debug +/test/cfg +/src/.vs +*.o diff --git a/src/kit/taosmigrate/CMakeLists.txt b/src/kit/taosmigrate/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f8581853922e17ce9cb123f95e13024c06e5f7d0 --- /dev/null +++ b/src/kit/taosmigrate/CMakeLists.txt @@ -0,0 +1,16 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + AUX_SOURCE_DIRECTORY(. SRC) + + ADD_EXECUTABLE(taosmigrate ${SRC}) + TARGET_LINK_LIBRARIES(taosmigrate common tutil cJson) +ENDIF () \ No newline at end of file diff --git a/src/kit/taosmigrate/taosmigrate.c b/src/kit/taosmigrate/taosmigrate.c new file mode 100644 index 0000000000000000000000000000000000000000..eb72b22e30835c8229707347064d6fee9fdb2253 --- /dev/null +++ b/src/kit/taosmigrate/taosmigrate.c @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "taosmigrate.h" + + +/* The options we understand. */ +static struct argp_option options[] = { + {0, 'r', "data dir", 0, "data dir", 0}, + {0, 'd', "dnodeId", 0, "dnode id", 1}, + {0, 'p', "port", 0, "dnode port", 1}, + {0, 'f', "fqdn", 0, "dnode fqdn", 1}, + {0, 'e', "ep", 0, "dnode ep", 1}, + {0, 'g', "multi dnodes", 0, "multi dnode info, e.g. \"2 7030 fqdn1 ep1:7030, 3 8030 fqdn2 ep2:8030\"", 2}, + {0}}; + +/* Used by main to communicate with parse_opt. */ +struct arguments { + char* dataDir; + int32_t dnodeId; + uint16_t port; + char* fqdn; + char* ep; + char* dnodeGroups; + char** arg_list; + int arg_list_len; +}; + +/* Parse a single option. */ +static error_t parse_opt(int key, char *arg, struct argp_state *state) { + struct arguments *arguments = state->input; + switch (key) { + case 'w': + arguments->dataDir = arg; + break; + case 'd': + arguments->dnodeId = atoi(arg); + break; + case 'p': + arguments->port = atoi(arg); + break; + case 'f': + arguments->fqdn = arg; + case 'e': + arguments->ep = arg; + break; + case 'g': + arguments->dnodeGroups = arg; + break; + case ARGP_KEY_ARG: + arguments->arg_list = &state->argv[state->next - 1]; + arguments->arg_list_len = state->argc - state->next + 1; + state->next = state->argc; + + argp_usage(state); + break; + + default: + return ARGP_ERR_UNKNOWN; + } + return 0; +} + +static struct argp argp = {options, parse_opt, 0, 0}; +struct arguments arguments = {NULL, 0, 0, NULL, NULL, NULL, NULL, 0}; +SdnodeGroup tsDnodeGroup = {0}; + +int tSystemShell(const char * cmd) +{ + FILE * fp; + int res; + char buf[1024]; + if (cmd == NULL) { + printf("tSystem cmd is NULL!\n"); + return -1; + } + + if ((fp = popen(cmd, "r") ) == NULL) { + printf("popen cmd:%s error: %s/n", cmd, strerror(errno)); + return -1; + } else { + while(fgets(buf, sizeof(buf), fp)) { + printf("popen result:%s", buf); + } + + if ((res = pclose(fp)) == -1) { + printf("close popen file pointer fp error!\n"); + } else { + printf("popen res is :%d\n", res); + } + + return res; + } +} + +void taosMvFile(char* destFile, char *srcFile) { + char shellCmd[1024+1] = {0}; + + //(void)snprintf(shellCmd, 1024, "cp -rf %s %s", srcDir, destDir); + (void)snprintf(shellCmd, 1024, "mv -f %s %s", srcFile, destFile); + tSystemShell(shellCmd); +} + +SdnodeIfo* getDnodeInfo(int32_t dnodeId) +{ + for (int32_t i = 0; i < tsDnodeGroup.dnodeNum; i++) { + if (dnodeId == tsDnodeGroup.dnodeArray[i].dnodeId) { + return &(tsDnodeGroup.dnodeArray[i]); + } + } + + return NULL; +} + +void parseOneDnodeInfo(char* buf, SdnodeIfo* pDnodeInfo) +{ + char *ptr; + char *p; + int32_t i = 0; + ptr = strtok_r(buf, " ", &p); + while(ptr != NULL){ + if (0 == i) { + pDnodeInfo->dnodeId = atoi(ptr); + } else if (1 == i) { + pDnodeInfo->port = atoi(ptr); + } else if (2 == i) { + tstrncpy(pDnodeInfo->fqdn, ptr, TSDB_FQDN_LEN); + } else if (3 == i) { + tstrncpy(pDnodeInfo->ep, ptr, TSDB_EP_LEN); + } else { + printf("input parameter error near:%s\n", buf); + exit(-1); + } + i++; + ptr = strtok_r(NULL, " ", &p); + } +} + +void saveDnodeGroups() +{ + if ((NULL != arguments.fqdn) && (NULL != arguments.ep) && (arguments.dnodeId > 0) && (0 != arguments.port)) { + //printf("dnodeId:%d port:%d fqdn:%s ep:%s\n", arguments.dnodeId, arguments.port, arguments.fqdn, arguments.ep); + + tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].dnodeId = arguments.dnodeId; + tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].port = arguments.port; + tstrncpy(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].fqdn, arguments.fqdn, TSDB_FQDN_LEN); + tstrncpy(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].ep, arguments.ep, TSDB_EP_LEN); + + tsDnodeGroup.dnodeNum++; + } + + if (NULL == arguments.dnodeGroups) { + return; + } + + //printf("dnodeGroups:%s\n", arguments.dnodeGroups); + + char buf[1024]; + char* str = NULL; + char* start = arguments.dnodeGroups; + while (NULL != (str = strstr(start, ","))) { + memcpy(buf, start, str - start); + // parse one dnode info: dnodeId port fqdn ep + parseOneDnodeInfo(buf, &(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum])); + tsDnodeGroup.dnodeNum++; + // next + start = str + 1; + str = NULL; + } + + if (strlen(start)) { + parseOneDnodeInfo(start, &(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum])); + tsDnodeGroup.dnodeNum++; + } +} + +int32_t main(int32_t argc, char *argv[]) { + memset(&tsDnodeGroup, 0, sizeof(SdnodeGroup)); + + argp_parse(&argp, argc, argv, 0, 0, &arguments); + + if ((NULL == arguments.dataDir) || ((NULL == arguments.dnodeGroups) + && (NULL == arguments.fqdn || NULL == arguments.ep || arguments.dnodeId < 1 || 0 == arguments.port))) { + printf("input parameter error!\n"); + return -1; + } + + saveDnodeGroups(); + + printf("===================arguments:==================\n"); + printf("oldWal:%s\n", arguments.dataDir); + for (int32_t i = 0; i < tsDnodeGroup.dnodeNum; i++) { + printf("dnodeId:%d port:%d fqdn:%s ep:%s\n", tsDnodeGroup.dnodeArray[i].dnodeId, + tsDnodeGroup.dnodeArray[i].port, + tsDnodeGroup.dnodeArray[i].fqdn, + tsDnodeGroup.dnodeArray[i].ep); + } + printf("===========================\n"); + + // 1. modify wal for mnode + char mnodeWal[TSDB_FILENAME_LEN*2] = {0}; + (void)snprintf(mnodeWal, TSDB_FILENAME_LEN*2, "%s/mnode/wal/wal0", arguments.dataDir); + walModWalFile(mnodeWal); + + // 2. modfiy dnode config: mnodeIpList.json + char dnodeIpList[TSDB_FILENAME_LEN*2] = {0}; + (void)snprintf(dnodeIpList, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeIpList.json", arguments.dataDir); + modDnodeIpList(dnodeIpList); + + // 3. modify vnode config: config.json + char vnodeDir[TSDB_FILENAME_LEN*2] = {0}; + (void)snprintf(vnodeDir, TSDB_FILENAME_LEN*2, "%s/vnode", arguments.dataDir); + modAllVnode(vnodeDir); + + return 0; +} + diff --git a/src/kit/taosmigrate/taosmigrate.h b/src/kit/taosmigrate/taosmigrate.h new file mode 100644 index 0000000000000000000000000000000000000000..a0a02e651cac94502bd3ccecfac54fc795544cde --- /dev/null +++ b/src/kit/taosmigrate/taosmigrate.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TAOS_MIGRATE_H +#define TAOS_MIGRATE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define _GNU_SOURCE + +#ifndef _ALPINE +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "taosdef.h" +#include "tutil.h" +#include "twal.h" +#include "tchecksum.h" +#include "mnodeDef.h" +#include "mnodeSdb.h" +#include "cJSON.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tsdb.h" + +//#include "vnode.h" +#include "vnodeInt.h" + +#define MAX_DNODE_NUM 128 + + +typedef struct _SdnodeIfo { + int32_t dnodeId; + uint16_t port; + char fqdn[TSDB_FQDN_LEN+1]; + char ep[TSDB_EP_LEN+1]; +} SdnodeIfo; + +typedef struct _SdnodeGroup { + int32_t dnodeNum; + SdnodeIfo dnodeArray[MAX_DNODE_NUM]; +} SdnodeGroup; + +int tSystemShell(const char * cmd); +void taosMvFile(char* destFile, char *srcFile) ; +void walModWalFile(char* walfile); +SdnodeIfo* getDnodeInfo(int32_t dnodeId); +void modDnodeIpList(char* dnodeIpList); +void modAllVnode(char *vnodeDir); + +#endif diff --git a/src/kit/taosmigrate/taosmigrateDnodeCfg.c b/src/kit/taosmigrate/taosmigrateDnodeCfg.c new file mode 100644 index 0000000000000000000000000000000000000000..1d4af7f96b955aad3032fa73fff7fe435e43f6d2 --- /dev/null +++ b/src/kit/taosmigrate/taosmigrateDnodeCfg.c @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "taosmigrate.h" + +//#include "dnodeInt.h" +//#include "dnodeMgmt.h" +//#include "dnodeVRead.h" +//#include "dnodeVWrite.h" +//#include "dnodeModule.h" + +static SDMMnodeInfos tsDnodeIpInfos = {0}; + +static bool dnodeReadMnodeInfos(char* dnodeIpList) { + FILE *fp = fopen(dnodeIpList, "r"); + if (!fp) { + printf("failed to read mnodeIpList.json, file not exist\n"); + return false; + } + + bool ret = false; + int maxLen = 2000; + char *content = calloc(1, maxLen + 1); + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + free(content); + fclose(fp); + printf("failed to read mnodeIpList.json, content is null\n"); + return false; + } + + content[len] = 0; + cJSON* root = cJSON_Parse(content); + if (root == NULL) { + printf("failed to read mnodeIpList.json, invalid json format\n"); + goto PARSE_OVER; + } + + cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); + if (!inUse || inUse->type != cJSON_Number) { + printf("failed to read mnodeIpList.json, inUse not found\n"); + goto PARSE_OVER; + } + tsDnodeIpInfos.inUse = inUse->valueint; + + cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); + if (!nodeNum || nodeNum->type != cJSON_Number) { + printf("failed to read mnodeIpList.json, nodeNum not found\n"); + goto PARSE_OVER; + } + tsDnodeIpInfos.nodeNum = nodeNum->valueint; + + cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + printf("failed to read mnodeIpList.json, nodeInfos not found\n"); + goto PARSE_OVER; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != tsDnodeIpInfos.nodeNum) { + printf("failed to read mnodeIpList.json, nodeInfos size not matched\n"); + goto PARSE_OVER; + } + + for (int i = 0; i < size; ++i) { + cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + printf("failed to read mnodeIpList.json, nodeId not found\n"); + goto PARSE_OVER; + } + tsDnodeIpInfos.nodeInfos[i].nodeId = nodeId->valueint; + + cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); + if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { + printf("failed to read mnodeIpList.json, nodeName not found\n"); + goto PARSE_OVER; + } + strncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); + } + + ret = true; + + //printf("read mnode iplist successed, numOfIps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse); + //for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) { + // printf("mnode:%d, %s\n", tsDnodeIpInfos.nodeInfos[i].nodeId, tsDnodeIpInfos.nodeInfos[i].nodeEp); + //} + +PARSE_OVER: + free(content); + cJSON_Delete(root); + fclose(fp); + return ret; +} + + +static void dnodeSaveMnodeInfos(char* dnodeIpList) { + FILE *fp = fopen(dnodeIpList, "w"); + if (!fp) return; + + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDnodeIpInfos.inUse); + len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDnodeIpInfos.nodeNum); + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDnodeIpInfos.nodeInfos[i].nodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDnodeIpInfos.nodeInfos[i].nodeEp); + if (i < tsDnodeIpInfos.nodeNum -1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + printf("mod mnode iplist successed\n"); +} + +void modDnodeIpList(char* dnodeIpList) +{ + (void)dnodeReadMnodeInfos(dnodeIpList); + dnodeSaveMnodeInfos(dnodeIpList); + return; +} + + diff --git a/src/kit/taosmigrate/taosmigrateMnodeWal.c b/src/kit/taosmigrate/taosmigrateMnodeWal.c new file mode 100644 index 0000000000000000000000000000000000000000..6315ff99f79cf3414fc147ac2913cc7733069e4b --- /dev/null +++ b/src/kit/taosmigrate/taosmigrateMnodeWal.c @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "taosmigrate.h" + +static void recordWrite(int fd, SWalHead *pHead) { + + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); + + int contLen = pHead->len + sizeof(SWalHead); + + if(write(fd, pHead, contLen) != contLen) { + printf("failed to write(%s)", strerror(errno)); + exit(-1); + } +} + +static void recordMod(SWalHead* pHead) +{ + SDnodeObj *pDnode; + + ESdbTable tableId = (ESdbTable)(pHead->msgType / 10); + + switch (tableId) { + case SDB_TABLE_DNODE: + case SDB_TABLE_MNODE: + pDnode = (SDnodeObj *)pHead->cont; + + printf("dnodeId:%d port:%d fqdn:%s ep:%s\n", pDnode->dnodeId, pDnode->dnodePort, pDnode->dnodeFqdn, pDnode->dnodeEp); + + SdnodeIfo* pDnodeInfo = getDnodeInfo(pDnode->dnodeId); + if (NULL == pDnodeInfo) { + break; + } + + pDnode->dnodePort = pDnodeInfo->port; + tstrncpy(pDnode->dnodeFqdn, pDnodeInfo->fqdn, sizeof(pDnode->dnodeFqdn)); + tstrncpy(pDnode->dnodeEp, pDnodeInfo->ep, sizeof(pDnode->dnodeEp)); + break; + #if 0 + case SDB_TABLE_ACCOUNT: + SAcctObj *pAcct = (SDnodeObj *)pHead->cont; + break; + case SDB_TABLE_USER: + SUserObj *pUser = (SDnodeObj *)pHead->cont; + break; + case SDB_TABLE_DB: + SDbObj *pDb = (SDnodeObj *)pHead->cont; + break; + case SDB_TABLE_VGROUP: + SVgObj *pVgroup = (SDnodeObj *)pHead->cont; + break; + case SDB_TABLE_STABLE: + SSuperTableObj *pStable = (SDnodeObj *)pHead->cont; + break; + case SDB_TABLE_CTABLE: + SChildTableObj *pCTable = (SDnodeObj *)pHead->cont; + break; + #endif + default: + break; + } +} + +void walModWalFile(char* walfile) { + char *buffer = malloc(1024000); // size for one record + if (buffer == NULL) { + printf("failed to malloc:%s\n", strerror(errno)); + return ; + } + + SWalHead *pHead = (SWalHead *)buffer; + + int rfd = open(walfile, O_RDONLY); + if (rfd < 0) { + printf("failed to open %s failed:%s\n", walfile, strerror(errno)); + free(buffer); + return ; + } + + char newWalFile[32] = "wal0"; + int wfd = open(newWalFile, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + + if (wfd < 0) { + printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno)); + free(buffer); + return ; + } + + printf("start to mod %s into %s\n", walfile, newWalFile); + + while (1) { + memset(buffer, 0, 1024000); + int ret = read(rfd, pHead, sizeof(SWalHead)); + if ( ret == 0) break; + + if (ret != sizeof(SWalHead)) { + printf("wal:%s, failed to read head, skip, ret:%d(%s)\n", walfile, ret, strerror(errno)); + break; + } + + if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { + printf("wal:%s, cksum is messed up, skip the rest of file\n", walfile); + break; + } + + ret = read(rfd, pHead->cont, pHead->len); + if ( ret != pHead->len) { + printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret); + break; + } + + recordMod(pHead); + recordWrite(wfd, pHead); + } + + close(rfd); + close(wfd); + free(buffer); + + taosMvFile(walfile, newWalFile); + + return ; +} + + + diff --git a/src/kit/taosmigrate/taosmigrateVnodeCfg.c b/src/kit/taosmigrate/taosmigrateVnodeCfg.c new file mode 100644 index 0000000000000000000000000000000000000000..2619b348e9eec2f837c8a8dd1734e176c9c25446 --- /dev/null +++ b/src/kit/taosmigrate/taosmigrateVnodeCfg.c @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "taosmigrate.h" + + +static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile) +{ + FILE *fp = fopen(cfgFile, "w"); + if (!fp) { + printf("failed to open vnode cfg file for write, file:%s error:%s\n", cfgFile, strerror(errno)); + return errno; + } + + int32_t len = 0; + int32_t maxLen = 1000; + char * content = calloc(1, maxLen + 1); + if (content == NULL) { + fclose(fp); + return -1; + } + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pVnode->db); + len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnode->cfgVersion); + len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnode->tsdbCfg.cacheBlockSize); + len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnode->tsdbCfg.totalBlocks); + len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnode->tsdbCfg.maxTables); + len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnode->tsdbCfg.daysPerFile); + len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnode->tsdbCfg.keep); + len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnode->tsdbCfg.keep1); + len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnode->tsdbCfg.keep2); + len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.minRowsPerFileBlock); + len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.maxRowsPerFileBlock); + len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnode->tsdbCfg.commitTime); + len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision); + len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression); + len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel); + len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica); + len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals); + len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum); + + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnode->syncCfg.nodeInfo[i].nodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s:%d\"\n", pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort); + + if (i < pVnode->syncCfg.replica - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + printf("mod vnode cfg %s successed\n", cfgFile); + + return 0; +} + +static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) +{ + cJSON *root = NULL; + char *content = NULL; + int maxLen = 1000; + int32_t ret = -1; + + FILE *fp = fopen(cfgFile, "r"); + if (!fp) { + printf("failed to open vnode cfg file:%s to read, error:%s\n", cfgFile, strerror(errno)); + goto PARSE_OVER; + } + + content = calloc(1, maxLen + 1); + if (content == NULL) { + goto PARSE_OVER; + } + + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + printf("failed to read vnode cfg, content is null, error:%s\n", strerror(errno)); + goto PARSE_OVER; + } + + root = cJSON_Parse(content); + if (root == NULL) { + printf("failed to json parse %s, invalid json format\n", cfgFile); + goto PARSE_OVER; + } + + cJSON *db = cJSON_GetObjectItem(root, "db"); + if (!db || db->type != cJSON_String || db->valuestring == NULL) { + printf("vgId:%d, failed to read vnode cfg, db not found\n", pVnode->vgId); + goto PARSE_OVER; + } + strcpy(pVnode->db, db->valuestring); + + cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion"); + if (!cfgVersion || cfgVersion->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, cfgVersion not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->cfgVersion = cfgVersion->valueint; + + cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); + if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, cacheBlockSize not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint; + + cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); + if (!totalBlocks || totalBlocks->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, totalBlocks not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint; + + cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); + if (!maxTables || maxTables->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.maxTables = maxTables->valueint; + + cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); + if (!daysPerFile || daysPerFile->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, daysPerFile not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint; + + cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep"); + if (!daysToKeep || daysToKeep->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, daysToKeep not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.keep = daysToKeep->valueint; + + cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); + if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, daysToKeep1 not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.keep1 = daysToKeep1->valueint; + + cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); + if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, daysToKeep2 not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.keep2 = daysToKeep2->valueint; + + cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); + if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint; + + cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); + if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; + + cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime"); + if (!commitTime || commitTime->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint; + + cJSON *precision = cJSON_GetObjectItem(root, "precision"); + if (!precision || precision->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, precision not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.precision = (int8_t)precision->valueint; + + cJSON *compression = cJSON_GetObjectItem(root, "compression"); + if (!compression || compression->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, compression not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.compression = (int8_t)compression->valueint; + + cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel"); + if (!walLevel || walLevel->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, walLevel not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->walCfg.walLevel = (int8_t) walLevel->valueint; + + cJSON *wals = cJSON_GetObjectItem(root, "wals"); + if (!wals || wals->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->walCfg.wals = (int8_t)wals->valueint; + pVnode->walCfg.keep = 0; + + cJSON *replica = cJSON_GetObjectItem(root, "replica"); + if (!replica || replica->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, replica not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.replica = (int8_t)replica->valueint; + + cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); + if (!quorum || quorum->type != cJSON_Number) { + printf("vgId: %d, failed to read vnode cfg, quorum not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.quorum = (int8_t)quorum->valueint; + + cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + printf("vgId:%d, failed to read vnode cfg, nodeInfos not found\n", pVnode->vgId); + goto PARSE_OVER; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != pVnode->syncCfg.replica) { + printf("vgId:%d, failed to read vnode cfg, nodeInfos size not matched\n", pVnode->vgId); + goto PARSE_OVER; + } + + for (int i = 0; i < size; ++i) { + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + printf("vgId:%d, failed to read vnode cfg, nodeId not found\n", pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint; + + cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); + if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { + printf("vgId:%d, failed to read vnode cfg, nodeFqdn not found\n", pVnode->vgId); + goto PARSE_OVER; + } + + taosGetFqdnPortFromEp(nodeEp->valuestring, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort); + //pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; + } + + ret = 0; + //printf("read vnode cfg successfully, replcia:%d\n", pVnode->syncCfg.replica); + //for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { + // printf("dnode:%d, %s:%d\n", pVnode->syncCfg.nodeInfo[i].nodeId, pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort); + //} + +PARSE_OVER: + tfree(content); + cJSON_Delete(root); + if (fp) fclose(fp); + return ret; +} + +static void modVnodeCfg(char* vnodeCfg) +{ + int32_t ret; + SVnodeObj vnodeObj = {0}; + ret = readVnodeCfg(&vnodeObj, vnodeCfg); + if (0 != ret) { + printf("read vnode cfg %s fail!\n", vnodeCfg); + return ; + } + + (void)saveVnodeCfg(&vnodeObj, vnodeCfg); + + return ; +} + +void modAllVnode(char *vnodeDir) +{ + DIR *dir = opendir(vnodeDir); + if (dir == NULL) return; + + char filename[1024]; + struct dirent *de = NULL; + while ((de = readdir(dir)) != NULL) { + if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue; + + if ((de->d_type & DT_DIR) && (strncmp(de->d_name, "vnode", 5) == 0)) { + memset(filename, 0, 1024); + snprintf(filename, 1023, "%s/%s/config.json", vnodeDir, de->d_name); + modVnodeCfg(filename); + } + } + + closedir(dir); +} +