提交 2a83bdd3 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.6.0")
SET(TD_VER_NUMBER "2.0.7.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -48,7 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin
#cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
......
......@@ -58,7 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin
#cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
......
#!/bin/bash
#
log_dir=$1
result_file=$2
if [ ! -n "$1" ];then
echo "Pleas input the director of taosdlog."
echo "usage: ./get_client.sh <taosdlog directory> <result file>"
exit 1
else
log_dir=$1
fi
if [ ! -n "$2" ];then
result_file=clientInfo.txt
else
result_file=$2
fi
grep "new TCP connection" ${log_dir}/taosdlog.* | sed -e "s/0x.* from / /"|sed -e "s/,.*$//"|sed -e "s/:[0-9]*$//"|sort -r|uniq -f 2|sort -k 3 -r|uniq -f 2 > ${result_file}
......@@ -45,8 +45,7 @@ if [ "$osType" != "Darwin" ]; then
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh"
else
#bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh ${script_dir}/get_client.sh"
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
else
......
......@@ -76,8 +76,10 @@ if [ "$osType" != "Darwin" ]; then
else
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -36,8 +36,7 @@ if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
else
#bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh ${script_dir}/get_client.sh"
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
......
......@@ -77,8 +77,10 @@ else
cp ${build_dir}/bin/taosd ${install_dir}/bin/powerd
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${build_dir}/bin/tarbitrator ${install_dir}/bin
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
fi
chmod a+x ${install_dir}/bin/* || :
......
name: tdengine
base: core18
version: '2.0.6.0'
version: '2.0.7.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......
......@@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.code = 0
};
// NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
return TSDB_CODE_SUCCESS;
}
......
......@@ -105,6 +105,7 @@ void taos_init_imp(void) {
taosReadGlobalCfg();
taosCheckGlobalCfg();
rpcInit();
tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
}
......@@ -179,6 +180,7 @@ void taos_cleanup(void) {
taosCloseRef(tscRefId);
taosCleanupKeywordsTable();
taosCloseLog();
if (tscEmbedded == 0) rpcCleanup();
m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
......
......@@ -20,6 +20,7 @@
#include "tconfig.h"
#include "tglobal.h"
#include "twal.h"
#include "trpc.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
......@@ -54,6 +55,7 @@ typedef struct {
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps},
......
......@@ -169,7 +169,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsClientRpc, epSet, rpcMsg);
rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL);
}
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
......@@ -180,4 +180,4 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp);
}
\ No newline at end of file
}
......@@ -78,12 +78,14 @@ typedef struct SRpcInit {
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit;
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
......
......@@ -3,3 +3,4 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY(taosdump)
......@@ -46,7 +46,7 @@ static struct argp_option options[] = {
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, valid option: client | server."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|clients|server."},
{"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{0}};
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(. SRC)
IF (TD_LINUX)
ADD_EXECUTABLE(taosdump ${SRC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdump taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(taosdump taos)
ENDIF ()
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iconv.h>
#include "os.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tsclient.h"
#include "tsdb.h"
#include "tutil.h"
#define COMMAND_SIZE 65536
//#define DEFAULT_DUMP_FILE "taosdump.sql"
int converStringToReadable(char *str, int size, char *buf, int bufsize);
int convertNCharToReadable(char *str, int size, char *buf, int bufsize);
void taosDumpCharset(FILE *fp);
void taosLoadFileCharset(FILE *fp, char *fcharset);
typedef struct {
short bytes;
int8_t type;
} SOColInfo;
// -------------------------- SHOW DATABASE INTERFACE-----------------------
enum _show_db_index {
TSDB_SHOW_DB_NAME_INDEX,
TSDB_SHOW_DB_CREATED_TIME_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_TABLES_INDEX,
TSDB_SHOW_DB_ROWS_INDEX,
TSDB_SHOW_DB_CACHE_INDEX,
TSDB_SHOW_DB_ABLOCKS_INDEX,
TSDB_SHOW_DB_TBLOCKS_INDEX,
TSDB_SHOW_DB_CTIME_INDEX,
TSDB_SHOW_DB_CLOG_INDEX,
TSDB_SHOW_DB_COMP_INDEX,
TSDB_MAX_SHOW_DB
};
// -----------------------------------------SHOW TABLES CONFIGURE -------------------------------------
enum _show_tables_index {
TSDB_SHOW_TABLES_NAME_INDEX,
TSDB_SHOW_TABLES_CREATED_TIME_INDEX,
TSDB_SHOW_TABLES_COLUMNS_INDEX,
TSDB_SHOW_TABLES_METRIC_INDEX,
TSDB_MAX_SHOW_TABLES
};
// ---------------------------------- DESCRIBE METRIC CONFIGURE ------------------------------
enum _describe_table_index {
TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX,
TSDB_DESCRIBE_METRIC_LENGTH_INDEX,
TSDB_DESCRIBE_METRIC_NOTE_INDEX,
TSDB_MAX_DESCRIBE_METRIC
};
typedef struct {
char field[TSDB_COL_NAME_LEN + 1];
char type[16];
int length;
char note[128];
} SColDes;
typedef struct {
char name[TSDB_COL_NAME_LEN + 1];
SColDes cols[];
} STableDef;
extern char version[];
typedef struct {
char name[TSDB_DB_NAME_LEN + 1];
int32_t replica;
int32_t days;
int32_t keep;
int32_t tables;
int32_t rows;
int32_t cache;
int32_t ablocks;
int32_t tblocks;
int32_t ctime;
int32_t clog;
int32_t comp;
} SDbInfo;
typedef struct {
char name[TSDB_TABLE_NAME_LEN + 1];
char metric[TSDB_TABLE_NAME_LEN + 1];
} STableRecord;
typedef struct {
bool isMetric;
STableRecord tableRecord;
} STableRecordInfo;
typedef struct {
pthread_t threadID;
int32_t threadIndex;
int32_t totalThreads;
char dbName[TSDB_TABLE_NAME_LEN + 1];
void *taosCon;
} SThreadParaObj;
static int64_t totalDumpOutRows = 0;
SDbInfo **dbInfos = NULL;
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
/* Program documentation. */
static char doc[] = "";
/* "Argp example #4 -- a program with somewhat more complicated\ */
/* options\ */
/* \vThis part of the documentation comes *after* the options;\ */
/* note that the text is automatically filled, but it's possible\ */
/* to force a line-break, e.g.\n<-- here."; */
/* A description of the arguments we accept. */
static char args_doc[] = "dbname [tbname ...]\n--databases dbname ...\n--all-databases\n-i inpath\n-o outpath";
/* Keys for options without short-options. */
#define OPT_ABORT 1 /* –abort */
/* The options we understand. */
static struct argp_option options[] = {
// connection option
{"host", 'h', "HOST", 0, "Server host dumping data from. Default is localhost.", 0},
{"user", 'u', "USER", 0, "User name used to connect to server. Default is root.", 0},
#ifdef _TD_POWER_
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is powerdb.", 0},
#else
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is taosdata.", 0},
#endif
{"port", 'P', "PORT", 0, "Port to connect", 0},
{"cversion", 'v', "CVERION", 0, "client version", 0},
{"mysqlFlag", 'q', "MYSQLFLAG", 0, "mysqlFlag, Default is 0", 0},
// input/output file
{"outpath", 'o', "OUTPATH", 0, "Output file path.", 1},
{"inpath", 'i', "INPATH", 0, "Input file path.", 1},
#ifdef _TD_POWER_
{"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/power/taos.cfg.", 1},
#else
{"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/taos/taos.cfg.", 1},
#endif
{"encode", 'e', "ENCODE", 0, "Input file encoding.", 1},
// dump unit options
{"all-databases", 'A', 0, 0, "Dump all databases.", 2},
{"databases", 'B', 0, 0, "Dump assigned databases", 2},
// dump format options
{"schemaonly", 's', 0, 0, "Only dump schema.", 3},
{"with-property", 'M', 0, 0, "Dump schema with properties.", 3},
{"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3},
{"end-time", 'E', "END_TIME", 0, "End time to dump.", 3},
{"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3},
{"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
{"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3},
{0}};
/* Used by main to communicate with parse_opt. */
struct arguments {
// connection option
char *host;
char *user;
char *password;
uint16_t port;
char cversion[12];
uint16_t mysqlFlag;
// output file
char outpath[TSDB_FILENAME_LEN+1];
char inpath[TSDB_FILENAME_LEN+1];
char *encode;
// dump unit option
bool all_databases;
bool databases;
// dump format option
bool schemaonly;
bool with_property;
int64_t start_time;
int64_t end_time;
int32_t data_batch;
int32_t table_batch; // num of table which will be dump into one output file.
bool allow_sys;
// other options
int32_t thread_num;
int abort;
char **arg_list;
int arg_list_len;
bool isDumpIn;
};
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
struct arguments *arguments = state->input;
wordexp_t full_path;
switch (key) {
// connection option
case 'a':
arguments->allow_sys = true;
break;
case 'h':
arguments->host = arg;
break;
case 'u':
arguments->user = arg;
break;
case 'p':
arguments->password = arg;
break;
case 'P':
arguments->port = atoi(arg);
break;
case 'q':
arguments->mysqlFlag = atoi(arg);
break;
case 'v':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid client vesion %s\n", arg);
return -1;
}
tstrncpy(arguments->cversion, full_path.we_wordv[0], 11);
wordfree(&full_path);
break;
// output file path
case 'o':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->outpath, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'i':
arguments->isDumpIn = true;
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->inpath, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'e':
arguments->encode = arg;
break;
// dump unit option
case 'A':
arguments->all_databases = true;
break;
case 'B':
arguments->databases = true;
break;
// dump format option
case 's':
arguments->schemaonly = true;
break;
case 'M':
arguments->with_property = true;
break;
case 'S':
// parse time here.
arguments->start_time = atol(arg);
break;
case 'E':
arguments->end_time = atol(arg);
break;
case 'N':
arguments->data_batch = atoi(arg);
break;
case 't':
arguments->table_batch = atoi(arg);
break;
case 'T':
arguments->thread_num = atoi(arg);
break;
case OPT_ABORT:
arguments->abort = 1;
break;
case ARGP_KEY_ARG:
arguments->arg_list = &state->argv[state->next - 1];
arguments->arg_list_len = state->argc - state->next + 1;
state->next = state->argc;
break;
default:
return ARGP_ERR_UNKNOWN;
}
return 0;
}
/* Our argp parser. */
static struct argp argp = {options, parse_opt, args_doc, doc};
int taosDumpOut(struct arguments *arguments);
int taosDumpIn(struct arguments *arguments);
void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp);
int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon);
int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon);
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp);
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp);
int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp, TAOS* taosCon);
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon);
int taosCheckParam(struct arguments *arguments);
void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName);
struct arguments tsArguments = {
// connection option
NULL,
"root",
#ifdef _TD_POWER_
"powerdb",
#else
"taosdata",
#endif
0,
"",
0,
// outpath and inpath
"",
"",
NULL,
// dump unit option
false,
false,
// dump format option
false,
false,
0,
INT64_MAX,
1,
1,
false,
// other options
5,
0,
NULL,
0,
false
};
int queryDB(TAOS *taos, char *command) {
TAOS_RES *pSql = NULL;
int32_t code = -1;
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (code) {
fprintf(stderr, "sql error: %s, reason:%s\n", command, taos_errstr(pSql));
}
taos_free_result(pSql);
return code;
}
int main(int argc, char *argv[]) {
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
argp_parse(&argp, argc, argv, 0, 0, &tsArguments);
if (tsArguments.abort) {
#ifndef _ALPINE
error(10, 0, "ABORTED");
#else
abort();
#endif
}
printf("====== arguments config ======\n");
{
printf("host: %s\n", tsArguments.host);
printf("user: %s\n", tsArguments.user);
printf("password: %s\n", tsArguments.password);
printf("port: %u\n", tsArguments.port);
printf("cversion: %s\n", tsArguments.cversion);
printf("mysqlFlag: %d\n", tsArguments.mysqlFlag);
printf("outpath: %s\n", tsArguments.outpath);
printf("inpath: %s\n", tsArguments.inpath);
printf("encode: %s\n", tsArguments.encode);
printf("all_databases: %d\n", tsArguments.all_databases);
printf("databases: %d\n", tsArguments.databases);
printf("schemaonly: %d\n", tsArguments.schemaonly);
printf("with_property: %d\n", tsArguments.with_property);
printf("start_time: %" PRId64 "\n", tsArguments.start_time);
printf("end_time: %" PRId64 "\n", tsArguments.end_time);
printf("data_batch: %d\n", tsArguments.data_batch);
printf("table_batch: %d\n", tsArguments.table_batch);
printf("allow_sys: %d\n", tsArguments.allow_sys);
printf("abort: %d\n", tsArguments.abort);
printf("isDumpIn: %d\n", tsArguments.isDumpIn);
printf("arg_list_len: %d\n", tsArguments.arg_list_len);
for (int32_t i = 0; i < tsArguments.arg_list_len; i++) {
printf("arg_list[%d]: %s\n", i, tsArguments.arg_list[i]);
}
}
printf("==============================\n");
if (tsArguments.cversion[0] != 0){
tstrncpy(version, tsArguments.cversion, 11);
}
if (taosCheckParam(&tsArguments) < 0) {
exit(EXIT_FAILURE);
}
if (tsArguments.isDumpIn) {
if (taosDumpIn(&tsArguments) < 0) return -1;
} else {
if (taosDumpOut(&tsArguments) < 0) return -1;
}
return 0;
}
void taosFreeDbInfos() {
if (dbInfos == NULL) return;
for (int i = 0; i < 128; i++) tfree(dbInfos[i]);
tfree(dbInfos);
}
// check table is normal table or super table
int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo, TAOS *taosCon) {
TAOS_ROW row = NULL;
bool isSet = false;
TAOS_RES *result = NULL;
memset(pTableRecordInfo, 0, sizeof(STableRecordInfo));
char* tempCommand = (char *)malloc(COMMAND_SIZE);
if (tempCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return -1;
}
sprintf(tempCommand, "show tables like %s", table);
result = taos_query(taosCon, tempCommand);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tempCommand);
free(tempCommand);
taos_free_result(result);
return -1;
}
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isMetric = false;
strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
break;
}
taos_free_result(result);
result = NULL;
if (isSet) {
free(tempCommand);
return 0;
}
sprintf(tempCommand, "show stables like %s", table);
result = taos_query(taosCon, tempCommand);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tempCommand);
free(tempCommand);
taos_free_result(result);
return -1;
}
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isMetric = true;
tstrncpy(pTableRecordInfo->tableRecord.metric, table, TSDB_TABLE_NAME_LEN);
break;
}
taos_free_result(result);
result = NULL;
if (isSet) {
free(tempCommand);
return 0;
}
fprintf(stderr, "invalid table/metric %s\n", table);
free(tempCommand);
return -1;
}
int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter, char* metric, int* fd) {
STableRecord tableRecord;
if (-1 == *fd) {
*fd = open(".tables.tmp.0", O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (*fd == -1) {
fprintf(stderr, "failed to open temp file: .tables.tmp.0\n");
return -1;
}
}
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, meter, TSDB_TABLE_NAME_LEN);
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
taosWrite(*fd, &tableRecord, sizeof(STableRecord));
return 0;
}
int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct arguments *arguments, int32_t* totalNumOfThread) {
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
char* tmpCommand = (char *)malloc(COMMAND_SIZE);
if (tmpCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return -1;
}
sprintf(tmpCommand, "select tbname from %s", metric);
TAOS_RES *result = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tmpCommand);
free(tmpCommand);
taos_free_result(result);
return -1;
}
TAOS_FIELD *fields = taos_fetch_fields(result);
int32_t numOfTable = 0;
int32_t numOfThread = *totalNumOfThread;
char tmpFileName[TSDB_FILENAME_LEN + 1];
while ((row = taos_fetch_row(result)) != NULL) {
if (0 == numOfTable) {
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
sprintf(tmpFileName, ".tables.tmp.%d", numOfThread);
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
taos_free_result(result);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
(void)remove(tmpFileName);
}
free(tmpCommand);
return -1;
}
numOfThread++;
}
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes);
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
taosWrite(fd, &tableRecord, sizeof(STableRecord));
numOfTable++;
if (numOfTable >= arguments->table_batch) {
numOfTable = 0;
close(fd);
fd = -1;
}
}
if (fd >= 0) {
close(fd);
fd = -1;
}
taos_free_result(result);
*totalNumOfThread = numOfThread;
free(tmpCommand);
return 0;
}
int taosDumpOut(struct arguments *arguments) {
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = NULL;
TAOS_ROW row;
FILE *fp = NULL;
int32_t count = 0;
STableRecordInfo tableRecordInfo;
char tmpBuf[TSDB_FILENAME_LEN+9] = {0};
if (arguments->outpath[0] != 0) {
sprintf(tmpBuf, "%s/dbs.sql", arguments->outpath);
} else {
sprintf(tmpBuf, "dbs.sql");
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpBuf);
return -1;
}
dbInfos = (SDbInfo **)calloc(128, sizeof(SDbInfo *));
if (dbInfos == NULL) {
fprintf(stderr, "failed to allocate memory\n");
goto _exit_failure;
}
command = (char *)malloc(COMMAND_SIZE);
if (command == NULL) {
fprintf(stderr, "failed to allocate memory\n");
goto _exit_failure;
}
/* Connect to server */
taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port);
if (taos == NULL) {
fprintf(stderr, "failed to connect to TDengine server\n");
goto _exit_failure;
}
/* --------------------------------- Main Code -------------------------------- */
/* if (arguments->databases || arguments->all_databases) { // dump part of databases or all databases */
/* */
taosDumpCharset(fp);
sprintf(command, "show databases");
result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(taos));
goto _exit_failure;
}
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result)) != NULL) {
// sys database name : 'monitor', but subsequent version changed to 'log'
if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "monitor", fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0 &&
(!arguments->allow_sys))
continue;
if (arguments->databases) { // input multi dbs
for (int i = 0; arguments->arg_list[i]; i++) {
if (strncasecmp(arguments->arg_list[i], (char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
goto _dump_db_point;
}
continue;
} else if (!arguments->all_databases) { // only input one db
if (strncasecmp(arguments->arg_list[0], (char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
goto _dump_db_point;
else
continue;
}
_dump_db_point:
dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo));
if (dbInfos[count] == NULL) {
fprintf(stderr, "failed to allocate memory\n");
goto _exit_failure;
}
strncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes);
#if 0
dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]));
dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]));
dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]);
dbInfos[count]->tables = *((int *)row[TSDB_SHOW_DB_TABLES_INDEX]);
dbInfos[count]->rows = *((int *)row[TSDB_SHOW_DB_ROWS_INDEX]);
dbInfos[count]->cache = *((int *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->ablocks = *((int *)row[TSDB_SHOW_DB_ABLOCKS_INDEX]);
dbInfos[count]->tblocks = (int)(*((int16_t *)row[TSDB_SHOW_DB_TBLOCKS_INDEX]));
dbInfos[count]->ctime = *((int *)row[TSDB_SHOW_DB_CTIME_INDEX]);
dbInfos[count]->clog = (int)(*((int8_t *)row[TSDB_SHOW_DB_CLOG_INDEX]));
dbInfos[count]->comp = (int)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
#endif
count++;
if (arguments->databases) {
if (count > arguments->arg_list_len) break;
} else if (!arguments->all_databases) {
if (count >= 1) break;
}
}
if (count == 0) {
fprintf(stderr, "No databases valid to dump\n");
goto _exit_failure;
}
if (arguments->databases || arguments->all_databases) { // case: taosdump --databases dbx dby ... OR taosdump --all-databases
for (int i = 0; i < count; i++) {
taosDumpDb(dbInfos[i], arguments, fp, taos);
}
} else {
if (arguments->arg_list_len == 1) { // case: taosdump <db>
taosDumpDb(dbInfos[0], arguments, fp, taos);
} else { // case: taosdump <db> tablex tabley ...
taosDumpCreateDbClause(dbInfos[0], arguments->with_property, fp);
sprintf(command, "use %s", dbInfos[0]->name);
result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "invalid database %s\n", dbInfos[0]->name);
goto _exit_failure;
}
fprintf(fp, "USE %s;\n\n", dbInfos[0]->name);
int32_t totalNumOfThread = 1; // 0: all normal talbe into .tables.tmp.0
int normalTblFd = -1;
int32_t retCode;
for (int i = 1; arguments->arg_list[i]; i++) {
if (taosGetTableRecordInfo(arguments->arg_list[i], &tableRecordInfo, taos) < 0) {
fprintf(stderr, "input the invalide table %s\n", arguments->arg_list[i]);
continue;
}
if (tableRecordInfo.isMetric) { // dump all table of this metric
(void)taosDumpStable(tableRecordInfo.tableRecord.metric, fp, taos);
retCode = taosSaveTableOfMetricToTempFile(taos, tableRecordInfo.tableRecord.metric, arguments, &totalNumOfThread);
} else {
if (tableRecordInfo.tableRecord.metric[0] != '\0') { // dump this sub table and it's metric
(void)taosDumpStable(tableRecordInfo.tableRecord.metric, fp, taos);
}
retCode = taosSaveAllNormalTableToTempFile(taos, tableRecordInfo.tableRecord.name, tableRecordInfo.tableRecord.metric, &normalTblFd);
}
if (retCode < 0) {
if (-1 != normalTblFd){
taosClose(normalTblFd);
}
goto _clean_tmp_file;
}
}
if (-1 != normalTblFd){
taosClose(normalTblFd);
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads(arguments, totalNumOfThread, dbInfos[0]->name);
char tmpFileName[TSDB_FILENAME_LEN + 1];
_clean_tmp_file:
for (int loopCnt = 0; loopCnt < totalNumOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
remove(tmpFileName);
}
}
}
/* Close the handle and return */
fclose(fp);
taos_close(taos);
taos_free_result(result);
tfree(command);
taosFreeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", totalDumpOutRows);
return 0;
_exit_failure:
fclose(fp);
taos_close(taos);
taos_free_result(result);
tfree(command);
taosFreeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", totalDumpOutRows);
return -1;
}
int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon, bool isSuperTable) {
TAOS_ROW row = NULL;
TAOS_RES *tmpResult = NULL;
int count = 0;
char* tempCommand = (char *)malloc(COMMAND_SIZE);
if (tempCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return -1;
}
char* tbuf = (char *)malloc(COMMAND_SIZE);
if (tbuf == NULL) {
fprintf(stderr, "failed to allocate memory\n");
free(tempCommand);
return -1;
}
sprintf(tempCommand, "describe %s", table);
tmpResult = taos_query(taosCon, tempCommand);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tempCommand);
free(tempCommand);
free(tbuf);
taos_free_result(tmpResult);
return -1;
}
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
tstrncpy(tableDes->name, table, TSDB_COL_NAME_LEN);
while ((row = taos_fetch_row(tmpResult)) != NULL) {
strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
count++;
}
taos_free_result(tmpResult);
tmpResult = NULL;
if (isSuperTable) {
free(tempCommand);
free(tbuf);
return count;
}
// if chidl-table have tag, using select tagName from table to get tagValue
for (int i = 0 ; i < count; i++) {
if (strcmp(tableDes->cols[i].note, "TAG") != 0) continue;
sprintf(tempCommand, "select %s from %s", tableDes->cols[i].field, table);
tmpResult = taos_query(taosCon, tempCommand);
code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tempCommand);
free(tempCommand);
free(tbuf);
taos_free_result(tmpResult);
return -1;
}
fields = taos_fetch_fields(tmpResult);
row = taos_fetch_row(tmpResult);
if (NULL == row) {
fprintf(stderr, " fetch failed to run command %s\n", tempCommand);
free(tempCommand);
free(tbuf);
taos_free_result(tmpResult);
return -1;
}
if (row[0] == NULL) {
sprintf(tableDes->cols[i].note, "%s", "NULL");
taos_free_result(tmpResult);
tmpResult = NULL;
continue;
}
int32_t* length = taos_fetch_lengths(tmpResult);
//int32_t* length = taos_fetch_lengths(tmpResult);
switch (fields[0].type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(tableDes->cols[i].note, "%d", ((((int32_t)(*((char *)row[0]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(tableDes->cols[i].note, "%d", *((int8_t *)row[0]));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(tableDes->cols[i].note, "%d", *((int16_t *)row[0]));
break;
case TSDB_DATA_TYPE_INT:
sprintf(tableDes->cols[i].note, "%d", *((int32_t *)row[0]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(tableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[0]));
break;
case TSDB_DATA_TYPE_FLOAT:
sprintf(tableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[0]));
break;
case TSDB_DATA_TYPE_DOUBLE:
sprintf(tableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0]));
break;
case TSDB_DATA_TYPE_BINARY:
memset(tableDes->cols[i].note, 0, sizeof(tableDes->cols[i].note));
tableDes->cols[i].note[0] = '\'';
converStringToReadable((char *)row[0], length[0], tbuf, COMMAND_SIZE);
char* pstr = stpcpy(&(tableDes->cols[i].note[1]), tbuf);
*(pstr++) = '\'';
break;
case TSDB_DATA_TYPE_NCHAR:
memset(tableDes->cols[i].note, 0, sizeof(tableDes->cols[i].note));
convertNCharToReadable((char *)row[0], length[0], tbuf, COMMAND_SIZE);
sprintf(tableDes->cols[i].note, "\'%s\'", tbuf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
#if 0
if (!arguments->mysqlFlag) {
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[0]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
#endif
break;
default:
break;
}
taos_free_result(tmpResult);
tmpResult = NULL;
}
free(tempCommand);
free(tbuf);
return count;
}
int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp, TAOS* taosCon) {
int count = 0;
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
if (metric != NULL && metric[0] != '\0') { // dump table schema which is created by using super table
/*
count = taosGetTableDes(metric, tableDes, taosCon);
if (count < 0) {
free(tableDes);
return -1;
}
taosDumpCreateTableClause(tableDes, count, fp);
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
count = taosGetTableDes(table, tableDes, taosCon, false);
if (count < 0) {
free(tableDes);
return -1;
}
// create child-table using super-table
taosDumpCreateMTableClause(tableDes, metric, count, fp);
} else { // dump table definition
count = taosGetTableDes(table, tableDes, taosCon, false);
if (count < 0) {
free(tableDes);
return -1;
}
// create normal-table or super-table
taosDumpCreateTableClause(tableDes, count, fp);
}
free(tableDes);
return taosDumpTableData(fp, table, arguments, taosCon);
}
void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
char* tmpCommand = (char *)malloc(COMMAND_SIZE);
if (tmpCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return;
}
char *pstr = tmpCommand;
pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s", dbInfo->name);
if (isDumpProperty) {
pstr += sprintf(pstr,
" REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d",
dbInfo->replica, dbInfo->days, dbInfo->keep, dbInfo->tables, dbInfo->rows, dbInfo->cache,
dbInfo->ablocks, dbInfo->tblocks, dbInfo->ctime, dbInfo->clog, dbInfo->comp);
}
pstr += sprintf(pstr, ";");
fprintf(fp, "%s\n\n", tmpCommand);
free(tmpCommand);
}
void* taosDumpOutWorkThreadFp(void *arg)
{
SThreadParaObj *pThread = (SThreadParaObj*)arg;
STableRecord tableRecord;
int fd;
char tmpFileName[TSDB_FILENAME_LEN*4] = {0};
sprintf(tmpFileName, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpFileName);
return NULL;
}
FILE *fp = NULL;
memset(tmpFileName, 0, TSDB_FILENAME_LEN + 128);
if (tsArguments.outpath[0] != 0) {
sprintf(tmpFileName, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex);
} else {
sprintf(tmpFileName, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex);
}
fp = fopen(tmpFileName, "w");
if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpFileName);
close(fd);
return NULL;
}
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
sprintf(tmpFileName, "use %s", pThread->dbName);
TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpFileName);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "invalid database %s\n", pThread->dbName);
taos_free_result(tmpResult);
fclose(fp);
close(fd);
return NULL;
}
fprintf(fp, "USE %s;\n\n", pThread->dbName);
while (1) {
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
if (readLen <= 0) break;
taosDumpTable(tableRecord.name, tableRecord.metric, &tsArguments, fp, pThread->taosCon);
}
taos_free_result(tmpResult);
close(fd);
fclose(fp);
return NULL;
}
static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName)
{
pthread_attr_t thattr;
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj));
for (int t = 0; t < numOfThread; ++t) {
SThreadParaObj *pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = numOfThread;
tstrncpy(pThread->dbName, dbName, TSDB_TABLE_NAME_LEN);
pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port);
if (pThread->taosCon == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taosCon));
exit(0);
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr, taosDumpOutWorkThreadFp, (void*)pThread) != 0) {
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
exit(0);
}
}
for (int32_t t = 0; t < numOfThread; ++t) {
pthread_join(threadObj[t].threadID, NULL);
}
for (int32_t t = 0; t < numOfThread; ++t) {
taos_close(threadObj[t].taosCon);
}
free(threadObj);
}
int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) {
int count = 0;
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
if (NULL == tableDes) {
fprintf(stderr, "failed to allocate memory\n");
exit(-1);
}
count = taosGetTableDes(table, tableDes, taosCon, true);
if (count < 0) {
free(tableDes);
fprintf(stderr, "failed to get stable schema\n");
exit(-1);
}
taosDumpCreateTableClause(tableDes, count, fp);
free(tableDes);
return 0;
}
int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
{
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
char* tmpCommand = (char *)malloc(COMMAND_SIZE);
if (tmpCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
exit(-1);
}
sprintf(tmpCommand, "use %s", dbName);
TAOS_RES* tmpResult = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "invalid database %s, error: %s\n", dbName, taos_errstr(taosCon));
free(tmpCommand);
taos_free_result(tmpResult);
exit(-1);
}
taos_free_result(tmpResult);
sprintf(tmpCommand, "show stables");
tmpResult = taos_query(taosCon, tmpCommand);
code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", tmpCommand, taos_errstr(taosCon));
free(tmpCommand);
taos_free_result(tmpResult);
exit(-1);
}
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
char tmpFileName[TSDB_FILENAME_LEN + 1];
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
sprintf(tmpFileName, ".stables.tmp");
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
taos_free_result(tmpResult);
free(tmpCommand);
(void)remove(".stables.tmp");
exit(-1);
}
while ((row = taos_fetch_row(tmpResult)) != NULL) {
memset(&tableRecord, 0, sizeof(STableRecord));
strncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
taosWrite(fd, &tableRecord, sizeof(STableRecord));
}
taos_free_result(tmpResult);
(void)lseek(fd, 0, SEEK_SET);
while (1) {
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
if (readLen <= 0) break;
(void)taosDumpStable(tableRecord.name, fp, taosCon);
}
close(fd);
(void)remove(".stables.tmp");
free(tmpCommand);
return 0;
}
int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon) {
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
taosDumpCreateDbClause(dbInfo, arguments->with_property, fp);
char* tmpCommand = (char *)malloc(COMMAND_SIZE);
if (tmpCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return -1;
}
sprintf(tmpCommand, "use %s", dbInfo->name);
TAOS_RES* tmpResult = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "invalid database %s\n", dbInfo->name);
free(tmpCommand);
taos_free_result(tmpResult);
return -1;
}
taos_free_result(tmpResult);
fprintf(fp, "USE %s;\n\n", dbInfo->name);
(void)taosDumpCreateSuperTableClause(taosCon, dbInfo->name, fp);
sprintf(tmpCommand, "show tables");
tmpResult = taos_query(taosCon, tmpCommand);
code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tmpCommand);
free(tmpCommand);
taos_free_result(tmpResult);
return -1;
}
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
int32_t numOfTable = 0;
int32_t numOfThread = 0;
char tmpFileName[TSDB_FILENAME_LEN + 1];
while ((row = taos_fetch_row(tmpResult)) != NULL) {
if (0 == numOfTable) {
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
sprintf(tmpFileName, ".tables.tmp.%d", numOfThread);
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
taos_free_result(tmpResult);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
(void)remove(tmpFileName);
}
free(tmpCommand);
return -1;
}
numOfThread++;
}
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
taosWrite(fd, &tableRecord, sizeof(STableRecord));
numOfTable++;
if (numOfTable >= arguments->table_batch) {
numOfTable = 0;
close(fd);
fd = -1;
}
}
if (fd >= 0) {
close(fd);
fd = -1;
}
taos_free_result(tmpResult);
// start multi threads to dumpout
taosStartDumpOutWorkThreads(arguments, numOfThread, dbInfo->name);
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
(void)remove(tmpFileName);
}
free(tmpCommand);
return 0;
}
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp) {
int counter = 0;
int count_temp = 0;
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return;
}
char* pstr = tmpBuf;
pstr += sprintf(tmpBuf, "CREATE TABLE IF NOT EXISTS %s", tableDes->name);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
if (counter == 0) {
pstr += sprintf(pstr, " (%s %s", tableDes->cols[counter].field, tableDes->cols[counter].type);
} else {
pstr += sprintf(pstr, ", %s %s", tableDes->cols[counter].field, tableDes->cols[counter].type);
}
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length);
}
}
count_temp = counter;
for (; counter < numOfCols; counter++) {
if (counter == count_temp) {
pstr += sprintf(pstr, ") TAGS (%s %s", tableDes->cols[counter].field, tableDes->cols[counter].type);
} else {
pstr += sprintf(pstr, ", %s %s", tableDes->cols[counter].field, tableDes->cols[counter].type);
}
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length);
}
}
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
}
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp) {
int counter = 0;
int count_temp = 0;
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return;
}
char *pstr = NULL;
pstr = tmpBuf;
pstr += sprintf(tmpBuf, "CREATE TABLE IF NOT EXISTS %s USING %s TAGS (", tableDes->name, metric);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
}
assert(counter < numOfCols);
count_temp = counter;
for (; counter < numOfCols; counter++) {
if (counter != count_temp) {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
}
} else {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
}
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon) {
/* char temp[MAX_COMMAND_SIZE] = "\0"; */
int64_t totalRows = 0;
int count = 0;
char *pstr = NULL;
TAOS_ROW row = NULL;
int numFields = 0;
char *tbuf = NULL;
char* tmpCommand = (char *)calloc(1, COMMAND_SIZE);
if (tmpCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return -1;
}
char* tmpBuffer = (char *)calloc(1, COMMAND_SIZE);
if (tmpBuffer == NULL) {
fprintf(stderr, "failed to allocate memory\n");
free(tmpCommand);
return -1;
}
pstr = tmpBuffer;
if (arguments->schemaonly) {
free(tmpCommand);
free(tmpBuffer);
return 0;
}
sprintf(tmpCommand,
"select * from %s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc",
tbname,
arguments->start_time,
arguments->end_time);
TAOS_RES* tmpResult = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s, reason: %s\n", tmpCommand, taos_errstr(taosCon));
free(tmpCommand);
free(tmpBuffer);
taos_free_result(tmpResult);
return -1;
}
numFields = taos_field_count(tmpResult);
assert(numFields > 0);
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
tbuf = (char *)malloc(COMMAND_SIZE);
if (tbuf == NULL) {
fprintf(stderr, "No enough memory\n");
free(tmpCommand);
free(tmpBuffer);
taos_free_result(tmpResult);
return -1;
}
char sqlStr[8] = "\0";
if (arguments->mysqlFlag) {
sprintf(sqlStr, "INSERT");
} else {
sprintf(sqlStr, "IMPORT");
}
int rowFlag = 0;
count = 0;
while ((row = taos_fetch_row(tmpResult)) != NULL) {
pstr = tmpBuffer;
int32_t* length = taos_fetch_lengths(tmpResult); // act len
if (count == 0) {
pstr += sprintf(pstr, "%s INTO %s VALUES (", sqlStr, tbname);
} else {
if (arguments->mysqlFlag) {
if (0 == rowFlag) {
pstr += sprintf(pstr, "(");
rowFlag++;
} else {
pstr += sprintf(pstr, ", (");
}
} else {
pstr += sprintf(pstr, "(");
}
}
for (int col = 0; col < numFields; col++) {
if (col != 0) pstr += sprintf(pstr, ", ");
if (row[col] == NULL) {
pstr += sprintf(pstr, "NULL");
continue;
}
switch (fields[col].type) {
case TSDB_DATA_TYPE_BOOL:
pstr += sprintf(pstr, "%d", ((((int32_t)(*((char *)row[col]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
pstr += sprintf(pstr, "%d", *((int8_t *)row[col]));
break;
case TSDB_DATA_TYPE_SMALLINT:
pstr += sprintf(pstr, "%d", *((int16_t *)row[col]));
break;
case TSDB_DATA_TYPE_INT:
pstr += sprintf(pstr, "%d", *((int32_t *)row[col]));
break;
case TSDB_DATA_TYPE_BIGINT:
pstr += sprintf(pstr, "%" PRId64 "", *((int64_t *)row[col]));
break;
case TSDB_DATA_TYPE_FLOAT:
pstr += sprintf(pstr, "%f", GET_FLOAT_VAL(row[col]));
break;
case TSDB_DATA_TYPE_DOUBLE:
pstr += sprintf(pstr, "%f", GET_DOUBLE_VAL(row[col]));
break;
case TSDB_DATA_TYPE_BINARY:
*(pstr++) = '\'';
converStringToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
pstr = stpcpy(pstr, tbuf);
*(pstr++) = '\'';
break;
case TSDB_DATA_TYPE_NCHAR:
convertNCharToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
pstr += sprintf(pstr, "\'%s\'", tbuf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (!arguments->mysqlFlag) {
pstr += sprintf(pstr, "%" PRId64 "", *(int64_t *)row[col]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[col]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
pstr += sprintf(pstr, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
break;
default:
break;
}
}
pstr += sprintf(pstr, ") ");
totalRows++;
count++;
fprintf(fp, "%s", tmpBuffer);
if (count >= arguments->data_batch) {
fprintf(fp, ";\n");
count = 0;
} //else {
//fprintf(fp, "\\\n");
//}
}
atomic_add_fetch_64(&totalDumpOutRows, totalRows);
fprintf(fp, "\n");
if (tbuf) {
free(tbuf);
}
taos_free_result(tmpResult);
tmpResult = NULL;
free(tmpCommand);
free(tmpBuffer);
return 0;
}
int taosCheckParam(struct arguments *arguments) {
if (arguments->all_databases && arguments->databases) {
fprintf(stderr, "conflict option --all-databases and --databases\n");
return -1;
}
if (arguments->start_time > arguments->end_time) {
fprintf(stderr, "start time is larger than end time\n");
return -1;
}
if (arguments->arg_list_len == 0) {
if ((!arguments->all_databases) && (!arguments->isDumpIn)) {
fprintf(stderr, "taosdump requires parameters\n");
return -1;
}
}
/*
if (arguments->isDumpIn && (strcmp(arguments->outpath, DEFAULT_DUMP_FILE) != 0)) {
fprintf(stderr, "duplicate parameter input and output file path\n");
return -1;
}
*/
if (!arguments->isDumpIn && arguments->encode != NULL) {
fprintf(stderr, "invalid option in dump out\n");
return -1;
}
if (arguments->table_batch <= 0) {
fprintf(stderr, "invalid option in dump out\n");
return -1;
}
return 0;
}
bool isEmptyCommand(char *cmd) {
char *pchar = cmd;
while (*pchar != '\0') {
if (*pchar != ' ') return false;
pchar++;
}
return true;
}
void taosReplaceCtrlChar(char *str) {
_Bool ctrlOn = false;
char *pstr = NULL;
for (pstr = str; *str != '\0'; ++str) {
if (ctrlOn) {
switch (*str) {
case 'n':
*pstr = '\n';
pstr++;
break;
case 'r':
*pstr = '\r';
pstr++;
break;
case 't':
*pstr = '\t';
pstr++;
break;
case '\\':
*pstr = '\\';
pstr++;
break;
case '\'':
*pstr = '\'';
pstr++;
break;
default:
break;
}
ctrlOn = false;
} else {
if (*str == '\\') {
ctrlOn = true;
} else {
*pstr = *str;
pstr++;
}
}
}
*pstr = '\0';
}
char *ascii_literal_list[] = {
"\\x00", "\\x01", "\\x02", "\\x03", "\\x04", "\\x05", "\\x06", "\\x07", "\\x08", "\\t", "\\n", "\\x0b", "\\x0c",
"\\r", "\\x0e", "\\x0f", "\\x10", "\\x11", "\\x12", "\\x13", "\\x14", "\\x15", "\\x16", "\\x17", "\\x18", "\\x19",
"\\x1a", "\\x1b", "\\x1c", "\\x1d", "\\x1e", "\\x1f", " ", "!", "\\\"", "#", "$", "%", "&",
"\\'", "(", ")", "*", "+", ",", "-", ".", "/", "0", "1", "2", "3",
"4", "5", "6", "7", "8", "9", ":", ";", "<", "=", ">", "?", "@",
"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M",
"N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z",
"[", "\\\\", "]", "^", "_", "`", "a", "b", "c", "d", "e", "f", "g",
"h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t",
"u", "v", "w", "x", "y", "z", "{", "|", "}", "~", "\\x7f", "\\x80", "\\x81",
"\\x82", "\\x83", "\\x84", "\\x85", "\\x86", "\\x87", "\\x88", "\\x89", "\\x8a", "\\x8b", "\\x8c", "\\x8d", "\\x8e",
"\\x8f", "\\x90", "\\x91", "\\x92", "\\x93", "\\x94", "\\x95", "\\x96", "\\x97", "\\x98", "\\x99", "\\x9a", "\\x9b",
"\\x9c", "\\x9d", "\\x9e", "\\x9f", "\\xa0", "\\xa1", "\\xa2", "\\xa3", "\\xa4", "\\xa5", "\\xa6", "\\xa7", "\\xa8",
"\\xa9", "\\xaa", "\\xab", "\\xac", "\\xad", "\\xae", "\\xaf", "\\xb0", "\\xb1", "\\xb2", "\\xb3", "\\xb4", "\\xb5",
"\\xb6", "\\xb7", "\\xb8", "\\xb9", "\\xba", "\\xbb", "\\xbc", "\\xbd", "\\xbe", "\\xbf", "\\xc0", "\\xc1", "\\xc2",
"\\xc3", "\\xc4", "\\xc5", "\\xc6", "\\xc7", "\\xc8", "\\xc9", "\\xca", "\\xcb", "\\xcc", "\\xcd", "\\xce", "\\xcf",
"\\xd0", "\\xd1", "\\xd2", "\\xd3", "\\xd4", "\\xd5", "\\xd6", "\\xd7", "\\xd8", "\\xd9", "\\xda", "\\xdb", "\\xdc",
"\\xdd", "\\xde", "\\xdf", "\\xe0", "\\xe1", "\\xe2", "\\xe3", "\\xe4", "\\xe5", "\\xe6", "\\xe7", "\\xe8", "\\xe9",
"\\xea", "\\xeb", "\\xec", "\\xed", "\\xee", "\\xef", "\\xf0", "\\xf1", "\\xf2", "\\xf3", "\\xf4", "\\xf5", "\\xf6",
"\\xf7", "\\xf8", "\\xf9", "\\xfa", "\\xfb", "\\xfc", "\\xfd", "\\xfe", "\\xff"};
int converStringToReadable(char *str, int size, char *buf, int bufsize) {
char *pstr = str;
char *pbuf = buf;
while (size > 0) {
if (*pstr == '\0') break;
pbuf = stpcpy(pbuf, ascii_literal_list[((uint8_t)(*pstr))]);
pstr++;
size--;
}
*pbuf = '\0';
return 0;
}
int convertNCharToReadable(char *str, int size, char *buf, int bufsize) {
char *pstr = str;
char *pbuf = buf;
// TODO
wchar_t wc;
while (size > 0) {
if (*pstr == '\0') break;
int byte_width = mbtowc(&wc, pstr, MB_CUR_MAX);
if (byte_width < 0) {
fprintf(stderr, "mbtowc() return fail.\n");
exit(-1);
}
if ((int)wc < 256) {
pbuf = stpcpy(pbuf, ascii_literal_list[(int)wc]);
} else {
memcpy(pbuf, pstr, byte_width);
pbuf += byte_width;
}
pstr += byte_width;
}
*pbuf = '\0';
return 0;
}
void taosDumpCharset(FILE *fp) {
char charsetline[256];
(void)fseek(fp, 0, SEEK_SET);
sprintf(charsetline, "#!%s\n", tsCharset);
(void)fwrite(charsetline, strlen(charsetline), 1, fp);
}
void taosLoadFileCharset(FILE *fp, char *fcharset) {
char * line = NULL;
size_t line_size = 0;
(void)fseek(fp, 0, SEEK_SET);
ssize_t size = getline(&line, &line_size, fp);
if (size <= 2) {
goto _exit_no_charset;
}
if (strncmp(line, "#!", 2) != 0) {
goto _exit_no_charset;
}
if (line[size - 1] == '\n') {
line[size - 1] = '\0';
size--;
}
strcpy(fcharset, line + 2);
tfree(line);
return;
_exit_no_charset:
(void)fseek(fp, 0, SEEK_SET);
*fcharset = '\0';
tfree(line);
return;
}
// ======== dumpIn support multi threads functions ================================//
static char **tsDumpInSqlFiles = NULL;
static int32_t tsSqlFileNum = 0;
static char tsDbSqlFile[TSDB_FILENAME_LEN] = {0};
static char tsfCharset[64] = {0};
static int taosGetFilesNum(const char *directoryName, const char *prefix)
{
char cmd[1024] = { 0 };
sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix);
FILE *fp = popen(cmd, "r");
if (fp == NULL) {
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
exit(0);
}
int fileNum = 0;
if (fscanf(fp, "%d", &fileNum) != 1) {
fprintf(stderr, "ERROR: failed to execute:%s, parse result error\n", cmd);
exit(0);
}
if (fileNum <= 0) {
fprintf(stderr, "ERROR: directory:%s is empry\n", directoryName);
exit(0);
}
pclose(fp);
return fileNum;
}
static void taosParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles)
{
char cmd[1024] = { 0 };
sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix);
FILE *fp = popen(cmd, "r");
if (fp == NULL) {
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
exit(0);
}
int fileNum = 0;
while (fscanf(fp, "%128s", fileArray[fileNum++])) {
if (strcmp(fileArray[fileNum-1], tsDbSqlFile) == 0) {
fileNum--;
}
if (fileNum >= totalFiles) {
break;
}
}
if (fileNum != totalFiles) {
fprintf(stderr, "ERROR: directory:%s changed while read\n", directoryName);
pclose(fp);
exit(0);
}
pclose(fp);
}
static void taosCheckTablesSQLFile(const char *directoryName)
{
char cmd[1024] = { 0 };
sprintf(cmd, "ls %s/dbs.sql", directoryName);
FILE *fp = popen(cmd, "r");
if (fp == NULL) {
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
exit(0);
}
while (fscanf(fp, "%128s", tsDbSqlFile)) {
break;
}
pclose(fp);
}
static void taosMallocSQLFiles()
{
tsDumpInSqlFiles = (char**)calloc(tsSqlFileNum, sizeof(char*));
for (int i = 0; i < tsSqlFileNum; i++) {
tsDumpInSqlFiles[i] = calloc(1, TSDB_FILENAME_LEN);
}
}
static void taosFreeSQLFiles()
{
for (int i = 0; i < tsSqlFileNum; i++) {
tfree(tsDumpInSqlFiles[i]);
}
tfree(tsDumpInSqlFiles);
}
static void taosGetDirectoryFileList(char *inputDir)
{
struct stat fileStat;
if (stat(inputDir, &fileStat) < 0) {
fprintf(stderr, "ERROR: %s not exist\n", inputDir);
exit(0);
}
if (fileStat.st_mode & S_IFDIR) {
taosCheckTablesSQLFile(inputDir);
tsSqlFileNum = taosGetFilesNum(inputDir, "sql");
int totalSQLFileNum = tsSqlFileNum;
if (tsDbSqlFile[0] != 0) {
tsSqlFileNum--;
}
taosMallocSQLFiles();
taosParseDirectory(inputDir, "sql", tsDumpInSqlFiles, tsSqlFileNum);
fprintf(stdout, "\nstart to dispose %d files in %s\n", totalSQLFileNum, inputDir);
}
else {
fprintf(stderr, "ERROR: %s is not a directory\n", inputDir);
exit(0);
}
}
static FILE* taosOpenDumpInFile(char *fptr) {
wordexp_t full_path;
if (wordexp(fptr, &full_path, 0) != 0) {
fprintf(stderr, "ERROR: illegal file name: %s\n", fptr);
return NULL;
}
char *fname = full_path.we_wordv[0];
FILE *f = fopen(fname, "r");
if (f == NULL) {
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
wordfree(&full_path);
return NULL;
}
wordfree(&full_path);
return f;
}
int taosDumpInOneFile_old(TAOS * taos, FILE* fp, char* fcharset, char* encode) {
char *command = NULL;
char *lcommand = NULL;
int tsize = 0;
char *line = NULL;
_Bool isRun = true;
size_t line_size = 0;
char *pstr = NULL;
char *lstr = NULL;
size_t inbytesleft = 0;
size_t outbytesleft = COMMAND_SIZE;
char *tcommand = NULL;
char *charsetOfFile = NULL;
iconv_t cd = (iconv_t)(-1);
command = (char *)malloc(COMMAND_SIZE);
lcommand = (char *)malloc(COMMAND_SIZE);
if (command == NULL || lcommand == NULL) {
fprintf(stderr, "failed to connect to allocate memory\n");
goto _dumpin_exit_failure;
}
// Resolve locale
if (*fcharset != '\0') {
charsetOfFile = fcharset;
} else {
charsetOfFile = encode;
}
if (charsetOfFile != NULL && strcasecmp(tsCharset, charsetOfFile) != 0) {
cd = iconv_open(tsCharset, charsetOfFile);
if (cd == ((iconv_t)(-1))) {
fprintf(stderr, "Failed to open iconv handle\n");
goto _dumpin_exit_failure;
}
}
pstr = command;
int64_t linenu = 0;
while (1) {
ssize_t size = getline(&line, &line_size, fp);
linenu++;
if (size <= 0) break;
if (size == 1) {
if (pstr != command) {
inbytesleft = pstr - command;
memset(lcommand, 0, COMMAND_SIZE);
pstr = command;
lstr = lcommand;
outbytesleft = COMMAND_SIZE;
if (cd != ((iconv_t)(-1))) {
iconv(cd, &pstr, &inbytesleft, &lstr, &outbytesleft);
tcommand = lcommand;
} else {
tcommand = command;
}
taosReplaceCtrlChar(tcommand);
if (queryDB(taos, tcommand) != 0) {
fprintf(stderr, "error sql: linenu: %" PRId64 " failed\n", linenu);
exit(0);
}
pstr = command;
pstr[0] = '\0';
tsize = 0;
isRun = true;
}
continue;
}
/* if (line[0] == '-' && line[1] == '-') continue; */
line[size - 1] = 0;
if (tsize + size - 1 > COMMAND_SIZE) {
fprintf(stderr, "command is too long\n");
goto _dumpin_exit_failure;
}
if (line[size - 2] == '\\') {
line[size - 2] = ' ';
isRun = false;
} else {
isRun = true;
}
memcpy(pstr, line, size - 1);
pstr += (size - 1);
*pstr = '\0';
if (!isRun) continue;
if (command != pstr && !isEmptyCommand(command)) {
inbytesleft = pstr - command;
memset(lcommand, 0, COMMAND_SIZE);
pstr = command;
lstr = lcommand;
outbytesleft = COMMAND_SIZE;
if (cd != ((iconv_t)(-1))) {
iconv(cd, &pstr, &inbytesleft, &lstr, &outbytesleft);
tcommand = lcommand;
} else {
tcommand = command;
}
taosReplaceCtrlChar(tcommand);
if (queryDB(taos, tcommand) != 0) {
fprintf(stderr, "error sql: linenu:%" PRId64 " failed\n", linenu);
exit(0);
}
}
pstr = command;
*pstr = '\0';
tsize = 0;
}
if (pstr != command) {
inbytesleft = pstr - command;
memset(lcommand, 0, COMMAND_SIZE);
pstr = command;
lstr = lcommand;
outbytesleft = COMMAND_SIZE;
if (cd != ((iconv_t)(-1))) {
iconv(cd, &pstr, &inbytesleft, &lstr, &outbytesleft);
tcommand = lcommand;
} else {
tcommand = command;
}
taosReplaceCtrlChar(lcommand);
if (queryDB(taos, tcommand) != 0)
fprintf(stderr, "error sql: linenu:%" PRId64 " failed \n", linenu);
}
if (cd != ((iconv_t)(-1))) iconv_close(cd);
tfree(line);
tfree(command);
tfree(lcommand);
taos_close(taos);
fclose(fp);
return 0;
_dumpin_exit_failure:
if (cd != ((iconv_t)(-1))) iconv_close(cd);
tfree(command);
tfree(lcommand);
taos_close(taos);
fclose(fp);
return -1;
}
int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, char* fileName) {
int read_len = 0;
char * cmd = NULL;
size_t cmd_len = 0;
char * line = NULL;
size_t line_len = 0;
cmd = (char *)malloc(COMMAND_SIZE);
if (cmd == NULL) {
fprintf(stderr, "failed to allocate memory\n");
return -1;
}
int lineNo = 0;
while ((read_len = getline(&line, &line_len, fp)) != -1) {
++lineNo;
if (read_len >= COMMAND_SIZE) continue;
line[--read_len] = '\0';
//if (read_len == 0 || isCommentLine(line)) { // line starts with #
if (read_len == 0 ) {
continue;
}
if (line[read_len - 1] == '\\') {
line[read_len - 1] = ' ';
memcpy(cmd + cmd_len, line, read_len);
cmd_len += read_len;
continue;
}
memcpy(cmd + cmd_len, line, read_len);
cmd[read_len + cmd_len]= '\0';
if (queryDB(taos, cmd)) {
fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName);
}
memset(cmd, 0, COMMAND_SIZE);
cmd_len = 0;
}
tfree(cmd);
tfree(line);
fclose(fp);
return 0;
}
void* taosDumpInWorkThreadFp(void *arg)
{
SThreadParaObj *pThread = (SThreadParaObj*)arg;
for (int32_t f = 0; f < tsSqlFileNum; ++f) {
if (f % pThread->totalThreads == pThread->threadIndex) {
char *SQLFileName = tsDumpInSqlFiles[f];
FILE* fp = taosOpenDumpInFile(SQLFileName);
if (NULL == fp) {
continue;
}
fprintf(stderr, "Success Open input file: %s\n", SQLFileName);
taosDumpInOneFile(pThread->taosCon, fp, tsfCharset, tsArguments.encode, SQLFileName);
}
}
return NULL;
}
static void taosStartDumpInWorkThreads(struct arguments *args)
{
pthread_attr_t thattr;
SThreadParaObj *pThread;
int32_t totalThreads = args->thread_num;
if (totalThreads > tsSqlFileNum) {
totalThreads = tsSqlFileNum;
}
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(totalThreads, sizeof(SThreadParaObj));
for (int32_t t = 0; t < totalThreads; ++t) {
pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = totalThreads;
pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port);
if (pThread->taosCon == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taosCon));
exit(0);
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr, taosDumpInWorkThreadFp, (void*)pThread) != 0) {
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
exit(0);
}
}
for (int t = 0; t < totalThreads; ++t) {
pthread_join(threadObj[t].threadID, NULL);
}
for (int t = 0; t < totalThreads; ++t) {
taos_close(threadObj[t].taosCon);
}
free(threadObj);
}
int taosDumpIn(struct arguments *arguments) {
assert(arguments->isDumpIn);
TAOS *taos = NULL;
FILE *fp = NULL;
taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port);
if (taos == NULL) {
fprintf(stderr, "failed to connect to TDengine server\n");
return -1;
}
taosGetDirectoryFileList(arguments->inpath);
if (tsDbSqlFile[0] != 0) {
fp = taosOpenDumpInFile(tsDbSqlFile);
if (NULL == fp) {
fprintf(stderr, "failed to open input file %s\n", tsDbSqlFile);
return -1;
}
fprintf(stderr, "Success Open input file: %s\n", tsDbSqlFile);
taosLoadFileCharset(fp, tsfCharset);
taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile);
}
taosStartDumpInWorkThreads(arguments);
taos_close(taos);
taosFreeSQLFiles();
return 0;
}
taos1_6="/root/mnt/work/test/td1.6/build/bin/taos"
taosdump1_6="/root/mnt/work/test/td1.6/build/bin/taosdump"
taoscfg1_6="/root/mnt/work/test/td1.6/test/cfg"
taos2_0="/root/mnt/work/test/td2.0/build/bin/taos"
taosdump2_0="/root/mnt/work/test/td2.0/build/bin/taosdump"
taoscfg2_0="/root/mnt/work/test/td2.0/test/cfg"
data_dir="/root/mnt/work/test/td1.6/output"
table_list="/root/mnt/work/test/td1.6/tables"
DBNAME="test"
NTABLES=$(wc -l ${table_list} | awk '{print $1;}')
NTABLES_PER_DUMP=101
mkdir -p ${data_dir}
i=0
round=0
command="${taosdump1_6} -c ${taoscfg1_6} -o ${data_dir} -N 100 -T 20 ${DBNAME}"
while IFS= read -r line
do
i=$((i+1))
command="${command} ${line}"
if [[ "$i" -eq ${NTABLES_PER_DUMP} ]]; then
round=$((round+1))
echo "Starting round ${round} dump out..."
rm -f ${data_dir}/*
${command}
echo "Starting round ${round} dump in..."
${taosdump2_0} -c ${taoscfg2_0} -i ${data_dir}
# Reset variables
# command="${taosdump1_6} -c ${taoscfg1_6} -o ${data_dir} -N 100 ${DBNAME}"
command="${taosdump1_6} -c ${taoscfg1_6} -o ${data_dir} -N 100 -T 20 ${DBNAME}"
i=0
fi
done < "${table_list}"
if [[ ${i} -ne "0" ]]; then
round=$((round+1))
echo "Starting round ${round} dump out..."
rm -f ${data_dir}/*
${command}
echo "Starting round ${round} dump in..."
${taosdump2_0} -c ${taoscfg2_0} -i ${data_dir}
fi
......@@ -135,7 +135,7 @@ int tsRpcOverhead;
static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
// server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0
......@@ -221,13 +221,15 @@ static void rpcFree(void *p) {
free(p);
}
void rpcInit(void) {
int32_t rpcInit(void) {
tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree);
return 0;
}
void rpcCleanup(void) {
......@@ -238,7 +240,7 @@ void rpcCleanup(void) {
void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc;
pthread_once(&tsRpcInit, rpcInit);
//pthread_once(&tsRpcInit, rpcInit);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
......@@ -379,7 +381,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}
int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext;
......@@ -405,14 +407,10 @@ int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|| type == TSDB_MSG_TYPE_CM_SHOW )
pContext->connType = RPC_CONN_TCPC;
// set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
pContext->rid = taosAddRef(tsRpcRefId, pContext);
if (pRid) *pRid = pContext->rid;
rpcSendReqToServer(pRpc, pContext);
return pContext->rid;
}
void rpcSendResponse(const SRpcMsg *pRsp) {
......@@ -528,7 +526,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
pContext->pRsp = pRsp;
pContext->pSet = pEpSet;
rpcSendRequest(shandle, pEpSet, pMsg);
rpcSendRequest(shandle, pEpSet, pMsg, NULL);
tsem_wait(&sem);
tsem_destroy(&sem);
......
......@@ -57,7 +57,7 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem);
......
......@@ -578,6 +578,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
#if 0
for (int32_t i = 0; i < pNode->replica; ++i) {
if (i == index) continue;
pPeer = pNode->peerInfo[i];
if (pPeer->version == nodeVersion) {
pPeer->role = TAOS_SYNC_ROLE_SLAVE;
......
......@@ -182,6 +182,8 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEve
return 0;
}
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
ret = read(sfd, pHead->cont, pHead->len);
if (ret < 0) return -1;
......
......@@ -57,7 +57,7 @@ void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) {
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
......
......@@ -208,6 +208,18 @@ typedef struct {
} SFileGroupIter;
// ------------------ tsdbMain.c
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SSubmitBlkIter;
typedef struct {
int32_t totalLen;
int32_t len;
void * pMsg;
} SSubmitMsgIter;
typedef struct {
int8_t state;
......@@ -430,7 +442,6 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
// ------------------ tsdbMemTable.c
int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
......
......@@ -516,7 +516,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
SFile file;
SFile * pFile = &file;
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN);
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN - 1);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
......
......@@ -32,18 +32,6 @@
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SSubmitBlkIter;
typedef struct {
int32_t totalLen;
int32_t len;
void * pMsg;
} SSubmitMsgIter;
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg);
static int32_t tsdbUnsetRepoEnv(char *rootDir);
......@@ -52,20 +40,13 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg);
static char * tsdbGetCfgFname(char *rootDir);
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
static void tsdbFreeRepo(STsdbRepo *pRepo);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbRestoreInfo(STsdbRepo *pRepo);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
......@@ -177,40 +158,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbDebug("vgId:%d repository is closed", vgId);
}
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitMsgIter msgIter = {0};
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
}
return -1;
}
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
SSubmitBlk *pBlock = NULL;
int32_t affectedrows = 0;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
return -1;
}
}
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0;
}
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -735,93 +682,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
}
}
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
int64_t points = 0;
ASSERT(pBlock->tid < pMeta->maxTables);
STable *pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64,
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
(*affectedrows)++;
points++;
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
return 0;
}
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SDataRow row = pIter->row;
if (row == NULL) return NULL;
pIter->len += dataRowLen(row);
if (pIter->len >= pIter->totalLen) {
pIter->row = NULL;
} else {
pIter->row = (char *)row + dataRowLen(row);
}
return row;
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
......@@ -855,14 +715,6 @@ _err:
return -1;
}
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
return 0;
}
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
int8_t ocompression = pRepo->config.compression;
pRepo->config.compression = compression;
......@@ -959,134 +811,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
return buf;
}
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) {
return 0;
} else {
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
}
if (pBlock->sversion > sversion) { // may need to update table schema
if (pBlock->schemaLen > 0) {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
STColumn *pTCol = (STColumn *)pBlock->data;
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
for (int i = 0; i < numOfCols; i++) {
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
}
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
} else {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
return -1;
}
} else {
ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
}
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO
// STsdbCache *pCache = pRepo->tsdbCache;
......
......@@ -18,7 +18,6 @@
#define TSDB_DATA_SKIPLIST_LEVEL 5
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
......@@ -32,103 +31,49 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta;
TKEY tkey = dataRowTKey(row);
TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL;
bool isRowDelete = TKEY_IS_DELETED(tkey);
if (isRowDelete) {
if (!pCfg->update) {
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now);
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
int32_t affectedrows = 0;
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
}
if (key > TABLE_LASTKEY(pTable)) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
return 0;
}
}
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
return -1;
}
dataRowCpy(pRow, row);
// Operations above may change pRepo->mem, retake those values
ASSERT(pRepo->mem != NULL);
pMemTable = pRepo->mem;
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64
" to table %s while create new table data object since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
tsdbInitSubmitMsgIter(pMsg, &msgIter);
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
return -1;
}
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
int64_t oldSize = SL_SIZE(pTableData->pData);
if (tSkipListPut(pTableData->pData, pRow) == NULL) {
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
} else {
int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
if (isRowDelete) {
if (TABLE_LASTKEY(pTable) == key) {
// TODO: need to update table last key here (may from file)
}
} else {
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
}
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
if (pMemTable->keyLast < key) pMemTable->keyLast = key;
pMemTable->numOfRows += deltaSize;
if (pTableData->keyFirst > key) pTableData->keyFirst = key;
if (pTableData->keyLast < key) pTableData->keyLast = key;
pTableData->numOfRows += deltaSize;
}
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0;
}
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
if (pMemTable == NULL) return 0;
int ref = T_REF_INC(pMemTable);
......@@ -152,7 +97,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
}
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) {
tsdbUnlockRepo(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
......@@ -189,6 +134,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
}
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
if (pMem != NULL) {
taosRUnLockLatch(&(pMem->latch));
tsdbUnRefMemTable(pRepo, pMem);
......@@ -197,8 +144,6 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
if (pIMem != NULL) {
tsdbUnRefMemTable(pRepo, pIMem);
}
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
}
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
......@@ -230,6 +175,10 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
ASSERT(pRepo->mem->extraBuffList != NULL);
SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
if (pNode == NULL) {
if (listNEles(pRepo->mem->extraBuffList) == 0) {
tdListFree(pRepo->mem->extraBuffList);
pRepo->mem->extraBuffList = NULL;
}
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
......@@ -260,18 +209,18 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
}
int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return 0;
SMemTable *pIMem = pRepo->imem;
if (pRepo->mem != NULL) {
sem_wait(&(pRepo->readyToCommit));
sem_wait(&(pRepo->readyToCommit));
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
}
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
......@@ -469,25 +418,6 @@ _exit:
}
// ---------------- LOCAL FUNCTIONS ----------------
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
ASSERT(pRepo->mem != NULL);
if (pRepo->mem->extraBuffList == NULL) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
pBufBlock->offset -= bytes;
pBufBlock->remain += bytes;
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
} else {
SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -(int)(sizeof(SListNode)));
ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
tdListPopNode(pRepo->mem->extraBuffList, pNode);
free(pNode);
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
}
}
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -848,4 +778,400 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
}
return 0;
}
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
return 0;
}
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SDataRow row = pIter->row;
if (row == NULL) return NULL;
pIter->len += dataRowLen(row);
if (pIter->len >= pIter->totalLen) {
pIter->row = NULL;
} else {
pIter->row = (char *)row + dataRowLen(row);
}
return row;
}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
dataRowKey(row));
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
int64_t points = 0;
STable * pTable = NULL;
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
void ** rows = NULL;
int rowCounter = 0;
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
if (rows == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
tsdbFreeRows(pRepo, rows, rowCounter);
goto _err;
}
(*affectedrows)++;
points++;
if (rows[rowCounter] != NULL) {
rowCounter++;
}
}
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
free(rows);
return 0;
_err:
free(rows);
return -1;
}
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) {
STsdbCfg * pCfg = &pRepo->config;
TKEY tkey = dataRowTKey(row);
TSKEY key = dataRowKey(row);
bool isRowDelete = TKEY_IS_DELETED(tkey);
if (isRowDelete) {
if (!pCfg->update) {
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
if (key > TABLE_LASTKEY(pTable)) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
return 0;
}
}
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
return -1;
}
dataRowCpy(pRow, row);
ppRow[0] = pRow;
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
return 0;
}
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) {
return 0;
} else {
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
}
if (pBlock->sversion > sversion) { // may need to update table schema
if (pBlock->schemaLen > 0) {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
STColumn *pTCol = (STColumn *)pBlock->data;
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
for (int i = 0; i < numOfCols; i++) {
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
}
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
} else {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
return -1;
}
} else {
ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
}
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
return 0;
}
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter) {
if (rowCounter < 1) return 0;
SMemTable * pMemTable = NULL;
STableData *pTableData = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
ASSERT(pRepo->mem != NULL);
pMemTable = pRepo->mem;
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeRows(pRepo, rows, rowCounter);
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
tsdbFreeRows(pRepo, rows, rowCounter);
return -1;
}
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
int64_t osize = SL_SIZE(pTableData->pData);
tSkipListPutBatch(pTableData->pData, rows, rowCounter);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
if (pMemTable->keyFirst > dataRowKey(rows[0])) pMemTable->keyFirst = dataRowKey(rows[0]);
if (pMemTable->keyLast < dataRowKey(rows[rowCounter - 1])) pMemTable->keyLast = dataRowKey(rows[rowCounter - 1]);
pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > dataRowKey(rows[0])) pTableData->keyFirst = dataRowKey(rows[0]);
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
pTableData->numOfRows += dsize;
// TODO: impl delete row thing
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]);
return 0;
}
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
ASSERT(pRepo->mem != NULL);
STsdbBufPool *pBufPool = pRepo->pPool;
for (int i = rowCounter - 1; i >= 0; --i) {
SDataRow row = (SDataRow)rows[i];
int bytes = (int)dataRowLen(row);
if (pRepo->mem->extraBuffList == NULL) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL && pBufBlock->offset >= bytes);
pBufBlock->offset -= bytes;
pBufBlock->remain += bytes;
ASSERT(row == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
if (pBufBlock->offset == 0) { // return the block to buffer pool
tsdbLockRepo(pRepo);
SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList);
tdListPrependNode(pBufPool->bufBlockList, pNode);
tsdbUnlockRepo(pRepo);
}
} else {
ASSERT(listNEles(pRepo->mem->extraBuffList) > 0);
SListNode *pNode = tdListPopTail(pRepo->mem->extraBuffList);
ASSERT(row == pNode->data);
free(pNode);
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
if (listNEles(pRepo->mem->extraBuffList) == 0) {
tdListFree(pRepo->mem->extraBuffList);
pRepo->mem->extraBuffList = NULL;
}
}
}
}
\ No newline at end of file
......@@ -1595,7 +1595,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
tblkIdx++;
} else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) {
// Delete the block and do some stuff
ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN);
// ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN);
if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1;
*pCommitIter->pIter = slIter;
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
......
......@@ -131,6 +131,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
__sl_key_fn_t fn);
void tSkipListDestroy(SSkipList *pSkipList);
SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData);
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata);
SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel);
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList);
......
......@@ -287,17 +287,17 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
tsLogObj.fileNum = maxFileNum;
taosGetLogFileName(fn);
if (strlen(fn) < LOG_FILE_NAME_LEN + 50 - 2) {
strcpy(name, fn);
strcat(name, ".0");
}
bool log0Exist = stat(name, &logstat0) >= 0;
if (strlen(fn) < LOG_FILE_NAME_LEN + 50 - 2) {
strcpy(name, fn);
strcat(name, ".1");
}
bool log0Exist = stat(name, &logstat0) >= 0;
bool log1Exist = stat(name, &logstat1) >= 0;
// if none of the log files exist, open 0, if both exists, open the old one
......
......@@ -426,11 +426,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
(*pSet->fp)(pNode->p);
uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
free(pNode);
released = 1;
} else {
uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count);
}
} else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
......
......@@ -24,10 +24,12 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
static void tSkipListCorrectLevel(SSkipList *pSkipList);
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode);
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
static SSkipListNode * tSkipListNewNode(uint8_t level);
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward);
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
static SSkipListNode *tSkipListNewNode(uint8_t level);
#define tSkipListFreeNode(n) tfree((n))
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup);
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
......@@ -109,30 +111,85 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
if (pSkipList == NULL || pData == NULL) return NULL;
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
uint8_t dupMode = SL_DUP_MODE(pSkipList);
SSkipListNode *pNode = NULL;
tSkipListWLock(pSkipList);
bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
if (dupMode == SL_UPDATE_DUP_KEY) {
pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0);
atomic_store_ptr(&(pNode->pData), pData);
}
} else {
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList));
if (pNode != NULL) {
pNode->pData = pData;
tSkipListUnlock(pSkipList);
return pNode;
}
// Put a batch of data into skiplist. The batch of data must be in ascending order
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool hasDup = false;
char * pKey = NULL;
char * pDataKey = NULL;
int compare = 0;
tSkipListWLock(pSkipList);
tSkipListDoInsert(pSkipList, backward, pNode);
// backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, ppData[0]);
tSkipListPutImpl(pSkipList, ppData[0], backward, false, hasDup);
for (int level = 0; level < pSkipList->maxLevel; level++) {
forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level);
}
// forward to put the rest of data
for (int idata = 1; idata < ndata; idata++) {
pDataKey = pSkipList->keyFn(ppData[idata]);
// Compare max key
pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare > 0) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
}
hasDup = false;
} else {
SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) {
// set new px
if (forward[i] != pSkipList->pHead) {
if (px == pSkipList->pHead ||
pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, forward[i]), SL_GET_NODE_KEY(pSkipList, px)) > 0) {
px = forward[i];
}
}
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i);
while (p != pSkipList->pTail) {
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) {
if (compare == 0) hasDup = true;
break;
} else {
px = p;
p = SL_NODE_GET_FORWARD_POINTER(px, i);
}
}
}
forward[i] = px;
}
}
tSkipListPutImpl(pSkipList, ppData[idata], forward, true, hasDup);
}
tSkipListUnlock(pSkipList);
return pNode;
}
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
......@@ -310,22 +367,25 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
}
}
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) {
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
for (int32_t i = 0; i < pNode->level; ++i) {
if (i >= pSkipList->level) {
SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = pSkipList->pHead;
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
SSkipListNode *x = direction[i];
if (isForward) {
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x;
SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i);
SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode;
SL_NODE_GET_FORWARD_POINTER(pNode, i) = next;
SL_NODE_GET_FORWARD_POINTER(x, i) = pNode;
} else {
SSkipListNode *x = backward[i];
SL_NODE_GET_FORWARD_POINTER(pNode, i) = x;
SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i);
SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev;
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
}
}
......@@ -377,7 +437,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
char * pDataKey = pSkipList->keyFn(pData);
if (pSkipList->size == 0) {
for (int i = 0; i < pSkipList->level; i++) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = pSkipList->pTail;
}
} else {
......@@ -387,7 +447,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare >= 0) {
for (int i = 0; i < pSkipList->level; i++) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = pSkipList->pTail;
}
......@@ -398,7 +458,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
pKey = SL_GET_MIN_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare < 0) {
for (int i = 0; i < pSkipList->level; i++) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i);
}
......@@ -406,18 +466,20 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
}
SSkipListNode *px = pSkipList->pTail;
for (int i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
while (p != pSkipList->pHead) {
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare <= 0) {
if (compare == 0 && !hasDupKey) hasDupKey = true;
break;
} else {
px = p;
p = SL_NODE_GET_BACKWARD_POINTER(px, i);
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) {
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
while (p != pSkipList->pHead) {
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare <= 0) {
if (compare == 0 && !hasDupKey) hasDupKey = true;
break;
} else {
px = p;
p = SL_NODE_GET_BACKWARD_POINTER(px, i);
}
}
}
......@@ -579,6 +641,32 @@ static SSkipListNode *tSkipListNewNode(uint8_t level) {
return pNode;
}
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup) {
uint8_t dupMode = SL_DUP_MODE(pSkipList);
SSkipListNode *pNode = NULL;
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
if (dupMode == SL_UPDATE_DUP_KEY) {
if (isForward) {
pNode = SL_NODE_GET_FORWARD_POINTER(direction[0], 0);
} else {
pNode = SL_NODE_GET_BACKWARD_POINTER(direction[0], 0);
}
atomic_store_ptr(&(pNode->pData), pData);
}
} else {
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList));
if (pNode != NULL) {
pNode->pData = pData;
tSkipListDoInsert(pSkipList, direction, pNode, isForward);
}
}
return pNode;
}
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock);
......
......@@ -107,7 +107,7 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
while (nleft > 0) {
nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) {
if (errno == EINTR)
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
else
return -1;
......@@ -133,7 +133,7 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
return -1;
......
......@@ -315,7 +315,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->version = walGetVersion(pVnode->wal);
}
tsdbSyncCommit(pVnode->tsdb);
code = tsdbSyncCommit(pVnode->tsdb);
if (code != 0) {
vError("vgId:%d, failed to commit after restore from wal since %s", pVnode->vgId, tstrerror(code));
vnodeCleanUp(pVnode);
return code;
}
walRemoveAllOldFiles(pVnode->wal);
walRenew(pVnode->wal);
......@@ -412,6 +418,7 @@ void vnodeRelease(void *pVnodeRaw) {
}
if (pVnode->wal) {
walRemoveAllOldFiles(pVnode->wal);
walClose(pVnode->wal);
pVnode->wal = NULL;
}
......@@ -589,7 +596,11 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
return walRenew(pVnode->wal);
if (pVnode->status == TAOS_VN_STATUS_INIT) {
return 0;
} else {
return walRenew(pVnode->wal);
}
}
if (status == TSDB_STATUS_COMMIT_OVER) {
......
......@@ -124,15 +124,7 @@ void walClose(void *handle) {
SWal *pWal = handle;
pthread_mutex_lock(&pWal->mutex);
taosClose(pWal->fd);
if (pWal->keep != TAOS_WAL_KEEP) {
walRemoveAllOldFiles(pWal);
} else {
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
}
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal->rid);
}
......
......@@ -67,6 +67,7 @@ void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return;
if (pWal->fd <= 0) return;
pthread_mutex_lock(&pWal->mutex);
......@@ -91,6 +92,8 @@ void walRemoveAllOldFiles(void *handle) {
SWal * pWal = handle;
int64_t fileId = -1;
pthread_mutex_lock(&pWal->mutex);
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
......@@ -100,6 +103,7 @@ void walRemoveAllOldFiles(void *handle) {
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
}
}
pthread_mutex_unlock(&pWal->mutex);
}
int32_t walWrite(void *handle, SWalHead *pHead) {
......
......@@ -155,6 +155,9 @@ python3 ./test.py -f query/queryCountCSVData.py
python3 ./test.py -f query/natualInterval.py
python3 ./test.py -f query/bug1471.py
#python3 ./test.py -f query/dataLossTest.py
python3 ./test.py -f query/bug1874.py
python3 ./test.py -f query/bug1875.py
python3 ./test.py -f query/bug1876.py
#stream
python3 ./test.py -f stream/metric_1.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册