提交 449d6676 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -4962,6 +4962,7 @@ static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { ...@@ -4962,6 +4962,7 @@ static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->commitTime = htonl(pCreateDb->commitTime); pMsg->commitTime = htonl(pCreateDb->commitTime);
pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock); pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock);
pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock); pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock);
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
pMsg->compression = pCreateDb->compressionLevel; pMsg->compression = pCreateDb->compressionLevel;
pMsg->walLevel = (char)pCreateDb->walLevel; pMsg->walLevel = (char)pCreateDb->walLevel;
pMsg->replications = pCreateDb->replica; pMsg->replications = pCreateDb->replica;
...@@ -5529,6 +5530,13 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { ...@@ -5529,6 +5530,13 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
val = htonl(pCreate->fsyncPeriod);
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val,
TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
if (pCreate->compression != -1 && if (pCreate->compression != -1 &&
(pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) { (pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression, snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
......
...@@ -80,6 +80,7 @@ extern int16_t tsCommitTime; // seconds ...@@ -80,6 +80,7 @@ extern int16_t tsCommitTime; // seconds
extern int32_t tsTimePrecision; extern int32_t tsTimePrecision;
extern int16_t tsCompression; extern int16_t tsCompression;
extern int16_t tsWAL; extern int16_t tsWAL;
extern int32_t tsFsyncPeriod;
extern int32_t tsReplications; extern int32_t tsReplications;
// balance // balance
......
...@@ -110,6 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds ...@@ -110,6 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION; int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMinTablePerVnode = 100; int32_t tsMinTablePerVnode = 100;
...@@ -715,6 +716,16 @@ static void doInitGlobalConfig() { ...@@ -715,6 +716,16 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "fsync";
cfg.ptr = &tsFsyncPeriod;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_FSYNC_PERIOD;
cfg.maxValue = TSDB_MAX_FSYNC_PERIOD;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "replica"; cfg.option = "replica";
cfg.ptr = &tsReplications; cfg.ptr = &tsReplications;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -401,6 +401,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -401,6 +401,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock); pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock); pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
......
...@@ -332,6 +332,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -332,6 +332,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_WAL_LEVEL 2 #define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_DB_REPLICA_OPTION 1 #define TSDB_MIN_DB_REPLICA_OPTION 1
#define TSDB_MAX_DB_REPLICA_OPTION 3 #define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1 #define TSDB_DEFAULT_DB_REPLICA_OPTION 1
......
...@@ -515,6 +515,7 @@ typedef struct { ...@@ -515,6 +515,7 @@ typedef struct {
int32_t minRowsPerFileBlock; int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock; int32_t maxRowsPerFileBlock;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod;
uint8_t precision; // time resolution uint8_t precision; // time resolution
int8_t compression; int8_t compression;
int8_t walLevel; int8_t walLevel;
...@@ -608,6 +609,7 @@ typedef struct { ...@@ -608,6 +609,7 @@ typedef struct {
int32_t minRowsPerFileBlock; int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock; int32_t maxRowsPerFileBlock;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod;
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t walLevel; int8_t walLevel;
......
...@@ -110,117 +110,117 @@ ...@@ -110,117 +110,117 @@
#define TK_BLOCKS 92 #define TK_BLOCKS 92
#define TK_CTIME 93 #define TK_CTIME 93
#define TK_WAL 94 #define TK_WAL 94
#define TK_COMP 95 #define TK_FSYNC 95
#define TK_PRECISION 96 #define TK_COMP 96
#define TK_LP 97 #define TK_PRECISION 97
#define TK_RP 98 #define TK_LP 98
#define TK_TAGS 99 #define TK_RP 99
#define TK_USING 100 #define TK_TAGS 100
#define TK_AS 101 #define TK_USING 101
#define TK_COMMA 102 #define TK_AS 102
#define TK_NULL 103 #define TK_COMMA 103
#define TK_SELECT 104 #define TK_NULL 104
#define TK_UNION 105 #define TK_SELECT 105
#define TK_ALL 106 #define TK_UNION 106
#define TK_FROM 107 #define TK_ALL 107
#define TK_VARIABLE 108 #define TK_FROM 108
#define TK_INTERVAL 109 #define TK_VARIABLE 109
#define TK_FILL 110 #define TK_INTERVAL 110
#define TK_SLIDING 111 #define TK_FILL 111
#define TK_ORDER 112 #define TK_SLIDING 112
#define TK_BY 113 #define TK_ORDER 113
#define TK_ASC 114 #define TK_BY 114
#define TK_DESC 115 #define TK_ASC 115
#define TK_GROUP 116 #define TK_DESC 116
#define TK_HAVING 117 #define TK_GROUP 117
#define TK_LIMIT 118 #define TK_HAVING 118
#define TK_OFFSET 119 #define TK_LIMIT 119
#define TK_SLIMIT 120 #define TK_OFFSET 120
#define TK_SOFFSET 121 #define TK_SLIMIT 121
#define TK_WHERE 122 #define TK_SOFFSET 122
#define TK_NOW 123 #define TK_WHERE 123
#define TK_RESET 124 #define TK_NOW 124
#define TK_QUERY 125 #define TK_RESET 125
#define TK_ADD 126 #define TK_QUERY 126
#define TK_COLUMN 127 #define TK_ADD 127
#define TK_TAG 128 #define TK_COLUMN 128
#define TK_CHANGE 129 #define TK_TAG 129
#define TK_SET 130 #define TK_CHANGE 130
#define TK_KILL 131 #define TK_SET 131
#define TK_CONNECTION 132 #define TK_KILL 132
#define TK_STREAM 133 #define TK_CONNECTION 133
#define TK_COLON 134 #define TK_STREAM 134
#define TK_ABORT 135 #define TK_COLON 135
#define TK_AFTER 136 #define TK_ABORT 136
#define TK_ATTACH 137 #define TK_AFTER 137
#define TK_BEFORE 138 #define TK_ATTACH 138
#define TK_BEGIN 139 #define TK_BEFORE 139
#define TK_CASCADE 140 #define TK_BEGIN 140
#define TK_CLUSTER 141 #define TK_CASCADE 141
#define TK_CONFLICT 142 #define TK_CLUSTER 142
#define TK_COPY 143 #define TK_CONFLICT 143
#define TK_DEFERRED 144 #define TK_COPY 144
#define TK_DELIMITERS 145 #define TK_DEFERRED 145
#define TK_DETACH 146 #define TK_DELIMITERS 146
#define TK_EACH 147 #define TK_DETACH 147
#define TK_END 148 #define TK_EACH 148
#define TK_EXPLAIN 149 #define TK_END 149
#define TK_FAIL 150 #define TK_EXPLAIN 150
#define TK_FOR 151 #define TK_FAIL 151
#define TK_IGNORE 152 #define TK_FOR 152
#define TK_IMMEDIATE 153 #define TK_IGNORE 153
#define TK_INITIALLY 154 #define TK_IMMEDIATE 154
#define TK_INSTEAD 155 #define TK_INITIALLY 155
#define TK_MATCH 156 #define TK_INSTEAD 156
#define TK_KEY 157 #define TK_MATCH 157
#define TK_OF 158 #define TK_KEY 158
#define TK_RAISE 159 #define TK_OF 159
#define TK_REPLACE 160 #define TK_RAISE 160
#define TK_RESTRICT 161 #define TK_REPLACE 161
#define TK_ROW 162 #define TK_RESTRICT 162
#define TK_STATEMENT 163 #define TK_ROW 163
#define TK_TRIGGER 164 #define TK_STATEMENT 164
#define TK_VIEW 165 #define TK_TRIGGER 165
#define TK_COUNT 166 #define TK_VIEW 166
#define TK_SUM 167 #define TK_COUNT 167
#define TK_AVG 168 #define TK_SUM 168
#define TK_MIN 169 #define TK_AVG 169
#define TK_MAX 170 #define TK_MIN 170
#define TK_FIRST 171 #define TK_MAX 171
#define TK_LAST 172 #define TK_FIRST 172
#define TK_TOP 173 #define TK_LAST 173
#define TK_BOTTOM 174 #define TK_TOP 174
#define TK_STDDEV 175 #define TK_BOTTOM 175
#define TK_PERCENTILE 176 #define TK_STDDEV 176
#define TK_APERCENTILE 177 #define TK_PERCENTILE 177
#define TK_LEASTSQUARES 178 #define TK_APERCENTILE 178
#define TK_HISTOGRAM 179 #define TK_LEASTSQUARES 179
#define TK_DIFF 180 #define TK_HISTOGRAM 180
#define TK_SPREAD 181 #define TK_DIFF 181
#define TK_TWA 182 #define TK_SPREAD 182
#define TK_INTERP 183 #define TK_TWA 183
#define TK_LAST_ROW 184 #define TK_INTERP 184
#define TK_RATE 185 #define TK_LAST_ROW 185
#define TK_IRATE 186 #define TK_RATE 186
#define TK_SUM_RATE 187 #define TK_IRATE 187
#define TK_SUM_IRATE 188 #define TK_SUM_RATE 188
#define TK_AVG_RATE 189 #define TK_SUM_IRATE 189
#define TK_AVG_IRATE 190 #define TK_AVG_RATE 190
#define TK_TBID 191 #define TK_AVG_IRATE 191
#define TK_SEMI 192 #define TK_TBID 192
#define TK_NONE 193 #define TK_SEMI 193
#define TK_PREV 194 #define TK_NONE 194
#define TK_LINEAR 195 #define TK_PREV 195
#define TK_IMPORT 196 #define TK_LINEAR 196
#define TK_METRIC 197 #define TK_IMPORT 197
#define TK_TBNAME 198 #define TK_METRIC 198
#define TK_JOIN 199 #define TK_TBNAME 199
#define TK_METRICS 200 #define TK_JOIN 200
#define TK_STABLE 201 #define TK_METRICS 201
#define TK_INSERT 202 #define TK_STABLE 202
#define TK_INTO 203 #define TK_INSERT 203
#define TK_VALUES 204 #define TK_INTO 204
#define TK_VALUES 205
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -35,6 +35,7 @@ typedef struct { ...@@ -35,6 +35,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t walLevel; // wal level int8_t walLevel; // wal level
int32_t fsyncPeriod; // millisecond
int8_t wals; // number of WAL files; int8_t wals; // number of WAL files;
int8_t keep; // keep the wal file when closed int8_t keep; // keep the wal file when closed
} SWalCfg; } SWalCfg;
......
...@@ -179,8 +179,8 @@ static struct argp_option options[] = { ...@@ -179,8 +179,8 @@ static struct argp_option options[] = {
{"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3}, {"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3},
{"end-time", 'E', "END_TIME", 0, "End time to dump.", 3}, {"end-time", 'E', "END_TIME", 0, "End time to dump.", 3},
{"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3}, {"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3},
{"table-batch", 'T', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3}, {"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
{"thread_num", 't', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3}, {"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3}, {"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3},
{0}}; {0}};
...@@ -304,10 +304,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -304,10 +304,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'N': case 'N':
arguments->data_batch = atoi(arg); arguments->data_batch = atoi(arg);
break; break;
case 'T': case 't':
arguments->table_batch = atoi(arg); arguments->table_batch = atoi(arg);
break; break;
case 't': case 'T':
arguments->thread_num = atoi(arg); arguments->thread_num = atoi(arg);
break; break;
case OPT_ABORT: case OPT_ABORT:
...@@ -406,7 +406,7 @@ int main(int argc, char *argv[]) { ...@@ -406,7 +406,7 @@ int main(int argc, char *argv[]) {
printf("password: %s\n", tsArguments.password); printf("password: %s\n", tsArguments.password);
printf("port: %u\n", tsArguments.port); printf("port: %u\n", tsArguments.port);
printf("cversion: %s\n", tsArguments.cversion); printf("cversion: %s\n", tsArguments.cversion);
printf("mysqlFlag: %d", tsArguments.mysqlFlag); printf("mysqlFlag: %d\n", tsArguments.mysqlFlag);
printf("outpath: %s\n", tsArguments.outpath); printf("outpath: %s\n", tsArguments.outpath);
printf("inpath: %s\n", tsArguments.inpath); printf("inpath: %s\n", tsArguments.inpath);
printf("encode: %s\n", tsArguments.encode); printf("encode: %s\n", tsArguments.encode);
...@@ -821,7 +821,7 @@ _exit_failure: ...@@ -821,7 +821,7 @@ _exit_failure:
return -1; return -1;
} }
int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon, bool isSuperTable) {
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
TAOS_RES *tmpResult = NULL; TAOS_RES *tmpResult = NULL;
int count = 0; int count = 0;
...@@ -832,6 +832,13 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { ...@@ -832,6 +832,13 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
return -1; return -1;
} }
char* tbuf = (char *)malloc(COMMAND_SIZE);
if (tbuf == NULL) {
fprintf(stderr, "failed to allocate memory\n");
free(tempCommand);
return -1;
}
sprintf(tempCommand, "describe %s", table); sprintf(tempCommand, "describe %s", table);
tmpResult = taos_query(taosCon, tempCommand); tmpResult = taos_query(taosCon, tempCommand);
...@@ -862,6 +869,92 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { ...@@ -862,6 +869,92 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
taos_free_result(tmpResult); taos_free_result(tmpResult);
tmpResult = NULL; tmpResult = NULL;
if (isSuperTable) {
free(tempCommand);
return count;
}
// if chidl-table have tag, using select tagName from table to get tagValue
for (int i = 0 ; i < count; i++) {
if (strcmp(tableDes->cols[i].note, "TAG") != 0) continue;
sprintf(tempCommand, "select %s from %s", tableDes->cols[i].field, table);
tmpResult = taos_query(taosCon, tempCommand);
code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tempCommand);
free(tempCommand);
taos_free_result(tmpResult);
return -1;
}
fields = taos_fetch_fields(tmpResult);
row = taos_fetch_row(tmpResult);
if (NULL == row) {
fprintf(stderr, " fetch failed to run command %s\n", tempCommand);
free(tempCommand);
taos_free_result(tmpResult);
return -1;
}
switch (fields[0].type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(tableDes->cols[i].note, "%d", ((((int)(*((char *)row[0]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(tableDes->cols[i].note, "%d", (int)(*((char *)row[0])));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(tableDes->cols[i].note, "%d", (int)(*((short *)row[0])));
break;
case TSDB_DATA_TYPE_INT:
sprintf(tableDes->cols[i].note, "%d", *((int *)row[0]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(tableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[0]));
break;
case TSDB_DATA_TYPE_FLOAT:
sprintf(tableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[0]));
break;
case TSDB_DATA_TYPE_DOUBLE:
sprintf(tableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0]));
break;
case TSDB_DATA_TYPE_BINARY:
tableDes->cols[i].note[0] = '\'';
converStringToReadable((char *)row[0], fields[0].bytes, tbuf, COMMAND_SIZE);
char* pstr = stpcpy(&(tableDes->cols[i].note[1]), tbuf);
*(pstr++) = '\'';
break;
case TSDB_DATA_TYPE_NCHAR:
convertNCharToReadable((char *)row[0], fields[0].bytes, tbuf, COMMAND_SIZE);
sprintf(tableDes->cols[i].note, "\'%s\'", tbuf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
#if 0
if (!arguments->mysqlFlag) {
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[0]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
#endif
break;
default:
break;
}
taos_free_result(tmpResult);
tmpResult = NULL;
}
free(tempCommand); free(tempCommand);
return count; return count;
...@@ -886,23 +979,25 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI ...@@ -886,23 +979,25 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/ */
count = taosGetTableDes(table, tableDes, taosCon); count = taosGetTableDes(table, tableDes, taosCon, false);
if (count < 0) { if (count < 0) {
free(tableDes); free(tableDes);
return -1; return -1;
} }
// create child-table using super-table
taosDumpCreateMTableClause(tableDes, metric, count, fp); taosDumpCreateMTableClause(tableDes, metric, count, fp);
} else { // dump table definition } else { // dump table definition
count = taosGetTableDes(table, tableDes, taosCon); count = taosGetTableDes(table, tableDes, taosCon, false);
if (count < 0) { if (count < 0) {
free(tableDes); free(tableDes);
return -1; return -1;
} }
// create normal-table or super-table
taosDumpCreateTableClause(tableDes, count, fp); taosDumpCreateTableClause(tableDes, count, fp);
} }
...@@ -1033,7 +1128,7 @@ int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) { ...@@ -1033,7 +1128,7 @@ int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) {
exit(-1); exit(-1);
} }
count = taosGetTableDes(table, tableDes, taosCon); count = taosGetTableDes(table, tableDes, taosCon, true);
if (count < 0) { if (count < 0) {
free(tableDes); free(tableDes);
...@@ -1083,7 +1178,6 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp) ...@@ -1083,7 +1178,6 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
taos_free_result(tmpResult); taos_free_result(tmpResult);
exit(-1); exit(-1);
} }
taos_free_result(tmpResult);
TAOS_FIELD *fields = taos_fetch_fields(tmpResult); TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
...@@ -1291,14 +1385,16 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols ...@@ -1291,14 +1385,16 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
if (counter != count_temp) { if (counter != count_temp) {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note); //pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
} else { } else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note); pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
} }
} else { } else {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note); //pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
} else { } else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
} }
...@@ -1363,7 +1459,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* ...@@ -1363,7 +1459,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
return -1; return -1;
} }
numFields = taos_field_count(taosCon); numFields = taos_field_count(tmpResult);
assert(numFields > 0); assert(numFields > 0);
TAOS_FIELD *fields = taos_fetch_fields(tmpResult); TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
tbuf = (char *)malloc(COMMAND_SIZE); tbuf = (char *)malloc(COMMAND_SIZE);
...@@ -2015,6 +2111,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c ...@@ -2015,6 +2111,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
} }
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
cmd[read_len + cmd_len]= '\0';
if (queryDB(taos, cmd)) { if (queryDB(taos, cmd)) {
fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName); fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName);
} }
......
...@@ -48,6 +48,7 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile) ...@@ -48,6 +48,7 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnode->walCfg.fsyncPeriod);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum);
...@@ -212,6 +213,13 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) ...@@ -212,6 +213,13 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
} }
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint; pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
if (!fsyncPeriod || fsyncPeriod->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, fsyncPeriod not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
cJSON *wals = cJSON_GetObjectItem(root, "wals"); cJSON *wals = cJSON_GetObjectItem(root, "wals");
if (!wals || wals->type != cJSON_Number) { if (!wals || wals->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId); printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId);
......
...@@ -160,6 +160,7 @@ typedef struct { ...@@ -160,6 +160,7 @@ typedef struct {
int32_t minRowsPerFileBlock; int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock; int32_t maxRowsPerFileBlock;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod;
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t walLevel; int8_t walLevel;
......
...@@ -287,14 +287,14 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { ...@@ -287,14 +287,14 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) { if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) {
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION, mError("invalid db option fsyncPeriod:%d, valid range: [%d, %d]", pCfg->fsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
TSDB_MAX_DB_REPLICA_OPTION);
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL) { if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
mError("invalid db option walLevel:%d must be greater than 0", pCfg->walLevel); mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION,
TSDB_MAX_DB_REPLICA_OPTION);
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }
...@@ -318,6 +318,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -318,6 +318,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep; if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep;
if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsMinRowsInFileBlock; if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsMinRowsInFileBlock;
if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = tsMaxRowsInFileBlock; if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = tsMaxRowsInFileBlock;
if (pCfg->fsyncPeriod <0) pCfg->fsyncPeriod = tsFsyncPeriod;
if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime; if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime;
if (pCfg->precision < 0) pCfg->precision = tsTimePrecision; if (pCfg->precision < 0) pCfg->precision = tsTimePrecision;
if (pCfg->compression < 0) pCfg->compression = tsCompression; if (pCfg->compression < 0) pCfg->compression = tsCompression;
...@@ -367,6 +368,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs ...@@ -367,6 +368,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
.daysToKeep2 = pCreate->daysToKeep2, .daysToKeep2 = pCreate->daysToKeep2,
.minRowsPerFileBlock = pCreate->minRowsPerFileBlock, .minRowsPerFileBlock = pCreate->minRowsPerFileBlock,
.maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock, .maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock,
.fsyncPeriod = pCreate->fsyncPeriod,
.commitTime = pCreate->commitTime, .commitTime = pCreate->commitTime,
.precision = pCreate->precision, .precision = pCreate->precision,
.compression = pCreate->compression, .compression = pCreate->compression,
...@@ -559,6 +561,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn ...@@ -559,6 +561,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "fsync");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 1; pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "comp"); strcpy(pSchema[cols].name, "comp");
...@@ -682,6 +690,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -682,6 +690,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
*(int8_t *)pWrite = pDb->cfg.walLevel; *(int8_t *)pWrite = pDb->cfg.walLevel;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.compression; *(int8_t *)pWrite = pDb->cfg.compression;
cols++; cols++;
...@@ -758,6 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { ...@@ -758,6 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1); pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCreate->commitTime = htonl(pCreate->commitTime); pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
...@@ -785,6 +798,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -785,6 +798,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
int32_t minRows = htonl(pAlter->minRowsPerFileBlock); int32_t minRows = htonl(pAlter->minRowsPerFileBlock);
int32_t maxRows = htonl(pAlter->maxRowsPerFileBlock); int32_t maxRows = htonl(pAlter->maxRowsPerFileBlock);
int32_t commitTime = htonl(pAlter->commitTime); int32_t commitTime = htonl(pAlter->commitTime);
int32_t fsyncPeriod = htonl(pAlter->fsyncPeriod);
int8_t compression = pAlter->compression; int8_t compression = pAlter->compression;
int8_t walLevel = pAlter->walLevel; int8_t walLevel = pAlter->walLevel;
int8_t replications = pAlter->replications; int8_t replications = pAlter->replications;
...@@ -861,6 +875,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -861,6 +875,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
} }
if (fsyncPeriod >= 0 && fsyncPeriod != pDb->cfg.fsyncPeriod) {
mError("db:%s, can't alter fsyncPeriod option", pDb->name);
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
}
if (replications > 0 && replications != pDb->cfg.replications) { if (replications > 0 && replications != pDb->cfg.replications) {
mDebug("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications); mDebug("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications);
newCfg.replications = replications; newCfg.replications = replications;
......
...@@ -170,7 +170,7 @@ static void *sdbGetTableFromId(int32_t tableId) { ...@@ -170,7 +170,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
} }
static int32_t sdbInitWal() { static int32_t sdbInitWal() {
SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1}; SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
char temp[TSDB_FILENAME_LEN]; char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/wal", tsMnodeDir); sprintf(temp, "%s/wal", tsMnodeDir);
tsSdbObj.wal = walOpen(temp, &walCfg); tsSdbObj.wal = walOpen(temp, &walCfg);
......
...@@ -757,6 +757,7 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -757,6 +757,7 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2); pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
pCfg->minRowsPerFileBlock = htonl(pDb->cfg.minRowsPerFileBlock); pCfg->minRowsPerFileBlock = htonl(pDb->cfg.minRowsPerFileBlock);
pCfg->maxRowsPerFileBlock = htonl(pDb->cfg.maxRowsPerFileBlock); pCfg->maxRowsPerFileBlock = htonl(pDb->cfg.maxRowsPerFileBlock);
pCfg->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod);
pCfg->commitTime = htonl(pDb->cfg.commitTime); pCfg->commitTime = htonl(pDb->cfg.commitTime);
pCfg->precision = pDb->cfg.precision; pCfg->precision = pDb->cfg.precision;
pCfg->compression = pDb->cfg.compression; pCfg->compression = pDb->cfg.compression;
......
...@@ -116,6 +116,7 @@ typedef struct SCreateDBInfo { ...@@ -116,6 +116,7 @@ typedef struct SCreateDBInfo {
int32_t daysPerFile; int32_t daysPerFile;
int32_t minRowsPerBlock; int32_t minRowsPerBlock;
int32_t maxRowsPerBlock; int32_t maxRowsPerBlock;
int32_t fsyncPeriod;
int64_t commitTime; int64_t commitTime;
int32_t walLevel; int32_t walLevel;
int32_t compressionLevel; int32_t compressionLevel;
......
...@@ -221,6 +221,7 @@ maxrows(Y) ::= MAXROWS INTEGER(X). { Y = X; } ...@@ -221,6 +221,7 @@ maxrows(Y) ::= MAXROWS INTEGER(X). { Y = X; }
blocks(Y) ::= BLOCKS INTEGER(X). { Y = X; } blocks(Y) ::= BLOCKS INTEGER(X). { Y = X; }
ctime(Y) ::= CTIME INTEGER(X). { Y = X; } ctime(Y) ::= CTIME INTEGER(X). { Y = X; }
wal(Y) ::= WAL INTEGER(X). { Y = X; } wal(Y) ::= WAL INTEGER(X). { Y = X; }
fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
comp(Y) ::= COMP INTEGER(X). { Y = X; } comp(Y) ::= COMP INTEGER(X). { Y = X; }
prec(Y) ::= PRECISION STRING(X). { Y = X; } prec(Y) ::= PRECISION STRING(X). { Y = X; }
...@@ -236,6 +237,7 @@ db_optr(Y) ::= db_optr(Z) maxrows(X). { Y = Z; Y.maxRowsPerBlock = strtod ...@@ -236,6 +237,7 @@ db_optr(Y) ::= db_optr(Z) maxrows(X). { Y = Z; Y.maxRowsPerBlock = strtod
db_optr(Y) ::= db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) ctime(X). { Y = Z; Y.commitTime = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) ctime(X). { Y = Z; Y.commitTime = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; } db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; } db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
...@@ -249,6 +251,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; } ...@@ -249,6 +251,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL, 10); }
%type typename {TAOS_FIELD} %type typename {TAOS_FIELD}
typename(A) ::= ids(X). { typename(A) ::= ids(X). {
......
...@@ -896,6 +896,7 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { ...@@ -896,6 +896,7 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo->compressionLevel = -1; pDBInfo->compressionLevel = -1;
pDBInfo->walLevel = -1; pDBInfo->walLevel = -1;
pDBInfo->fsyncPeriod = -1;
pDBInfo->commitTime = -1; pDBInfo->commitTime = -1;
pDBInfo->maxTablesPerVnode = -1; pDBInfo->maxTablesPerVnode = -1;
......
...@@ -124,6 +124,7 @@ static SKeyword keywordTable[] = { ...@@ -124,6 +124,7 @@ static SKeyword keywordTable[] = {
{"CACHE", TK_CACHE}, {"CACHE", TK_CACHE},
{"CTIME", TK_CTIME}, {"CTIME", TK_CTIME},
{"WAL", TK_WAL}, {"WAL", TK_WAL},
{"FSYNC", TK_FSYNC},
{"COMP", TK_COMP}, {"COMP", TK_COMP},
{"PRECISION", TK_PRECISION}, {"PRECISION", TK_PRECISION},
{"LP", TK_LP}, {"LP", TK_LP},
......
此差异已折叠。
...@@ -69,6 +69,7 @@ int32_t vnodeInitResources() { ...@@ -69,6 +69,7 @@ int32_t vnodeInitResources() {
} }
void vnodeCleanupResources() { void vnodeCleanupResources() {
if (tsDnodeVnodesHash != NULL) { if (tsDnodeVnodesHash != NULL) {
taosHashCleanup(tsDnodeVnodesHash); taosHashCleanup(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL; tsDnodeVnodesHash = NULL;
...@@ -137,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -137,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return TSDB_CODE_VND_INIT_FAILED; return TSDB_CODE_VND_INIT_FAILED;
} }
vInfo("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel); vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code; return code;
...@@ -618,6 +619,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -618,6 +619,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnodeCfg->cfg.fsyncPeriod);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
...@@ -782,6 +784,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -782,6 +784,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint; pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, fsyncPeriod not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
cJSON *wals = cJSON_GetObjectItem(root, "wals"); cJSON *wals = cJSON_GetObjectItem(root, "wals");
if (!wals || wals->type != cJSON_Number) { if (!wals || wals->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId); vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "tlog.h" #include "tlog.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tutil.h" #include "tutil.h"
#include "ttimer.h"
#include "taoserror.h" #include "taoserror.h"
#include "twal.h" #include "twal.h"
#include "tqueue.h" #include "tqueue.h"
...@@ -44,6 +45,9 @@ typedef struct { ...@@ -44,6 +45,9 @@ typedef struct {
int fd; int fd;
int keep; int keep;
int level; int level;
int32_t fsyncPeriod;
void *timer;
void *signature;
int max; // maximum number of wal files int max; // maximum number of wal files
uint32_t id; // increase continuously uint32_t id; // increase continuously
int num; // number of wal files int num; // number of wal files
...@@ -52,10 +56,23 @@ typedef struct { ...@@ -52,10 +56,23 @@ typedef struct {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWal; } SWal;
static void *walTmrCtrl = NULL;
static int tsWalNum = 0;
static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
static uint32_t walSignature = 0xFAFBFDFE; static uint32_t walSignature = 0xFAFBFDFE;
static int walHandleExistingFiles(const char *path); static int walHandleExistingFiles(const char *path);
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
static int walRemoveWalFiles(const char *path); static int walRemoveWalFiles(const char *path);
static void walProcessFsyncTimer(void *param, void *tmrId);
static void walRelease(SWal *pWal);
static void walModuleInitFunc() {
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
if (walTmrCtrl == NULL)
walModuleInit = PTHREAD_ONCE_INIT;
else
wDebug("WAL module is initialized");
}
void *walOpen(const char *path, const SWalCfg *pCfg) { void *walOpen(const char *path, const SWalCfg *pCfg) {
SWal *pWal = calloc(sizeof(SWal), 1); SWal *pWal = calloc(sizeof(SWal), 1);
...@@ -64,20 +81,38 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { ...@@ -64,20 +81,38 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
return NULL; return NULL;
} }
pthread_once(&walModuleInit, walModuleInitFunc);
if (walTmrCtrl == NULL) {
free(pWal);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
atomic_add_fetch_32(&tsWalNum, 1);
pWal->fd = -1; pWal->fd = -1;
pWal->max = pCfg->wals; pWal->max = pCfg->wals;
pWal->id = 0; pWal->id = 0;
pWal->num = 0; pWal->num = 0;
pWal->level = pCfg->walLevel; pWal->level = pCfg->walLevel;
pWal->keep = pCfg->keep; pWal->keep = pCfg->keep;
pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->signature = pWal;
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
if (pWal->timer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
walRelease(pWal);
return NULL;
}
}
if (tmkdir(path, 0755) != 0) { if (tmkdir(path, 0755) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal:%s, failed to create directory(%s)", path, strerror(errno)); wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
pthread_mutex_destroy(&pWal->mutex); walRelease(pWal);
free(pWal);
pWal = NULL; pWal = NULL;
} }
...@@ -89,12 +124,11 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { ...@@ -89,12 +124,11 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
if (pWal && pWal->fd <0) { if (pWal && pWal->fd <0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal:%s, failed to open(%s)", path, strerror(errno)); wError("wal:%s, failed to open(%s)", path, strerror(errno));
pthread_mutex_destroy(&pWal->mutex); walRelease(pWal);
free(pWal);
pWal = NULL; pWal = NULL;
} }
if (pWal) wDebug("wal:%s, it is open, level:%d", path, pWal->level); if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
return pWal; return pWal;
} }
...@@ -102,7 +136,8 @@ void walClose(void *handle) { ...@@ -102,7 +136,8 @@ void walClose(void *handle) {
if (handle == NULL) return; if (handle == NULL) return;
SWal *pWal = handle; SWal *pWal = handle;
close(pWal->fd); tclose(pWal->fd);
if (pWal->timer) taosTmrStopA(&pWal->timer);
if (pWal->keep == 0) { if (pWal->keep == 0) {
// remove all files in the directory // remove all files in the directory
...@@ -118,9 +153,7 @@ void walClose(void *handle) { ...@@ -118,9 +153,7 @@ void walClose(void *handle) {
wDebug("wal:%s, it is closed and kept", pWal->name); wDebug("wal:%s, it is closed and kept", pWal->name);
} }
pthread_mutex_destroy(&pWal->mutex); walRelease(pWal);
free(pWal);
} }
int walRenew(void *handle) { int walRenew(void *handle) {
...@@ -194,9 +227,9 @@ int walWrite(void *handle, SWalHead *pHead) { ...@@ -194,9 +227,9 @@ int walWrite(void *handle, SWalHead *pHead) {
void walFsync(void *handle) { void walFsync(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL) return; if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { if (pWal->fsyncPeriod == 0) {
if (fsync(pWal->fd) < 0) { if (fsync(pWal->fd) < 0) {
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
} }
...@@ -303,6 +336,20 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { ...@@ -303,6 +336,20 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
return code; return code;
} }
static void walRelease(SWal *pWal) {
pthread_mutex_destroy(&pWal->mutex);
pWal->signature = NULL;
free(pWal);
if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) {
if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl);
walTmrCtrl = NULL;
walModuleInit = PTHREAD_ONCE_INIT;
wDebug("WAL module is cleaned up");
}
}
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
char *name = pWal->name; char *name = pWal->name;
...@@ -433,3 +480,15 @@ static int walRemoveWalFiles(const char *path) { ...@@ -433,3 +480,15 @@ static int walRemoveWalFiles(const char *path) {
return terrno; return terrno;
} }
static void walProcessFsyncTimer(void *param, void *tmrId) {
SWal *pWal = param;
if (pWal->signature != pWal) return;
if (pWal->fd < 0) return;
if (fsync(pWal->fd) < 0) {
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
}
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
}
...@@ -111,29 +111,29 @@ echo "serverPort ${NODE}" >> $TAOS_CFG ...@@ -111,29 +111,29 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 131" >> $TAOS_CFG echo "debugFlag 131" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 131" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 131" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 131" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG echo "vDebugFlag 131" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "tsdbDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 131" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 131" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 131" >> $TAOS_CFG echo "monitorDebugFlag 131" >> $TAOS_CFG
echo "mqttDebugFlag 131" >> $TAOS_CFG echo "mqttDebugFlag 131" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG echo "qdebugFlag 131" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 131" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG echo "udebugFlag 131" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 131" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG echo "wdebugFlag 131" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG
echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
echo "defaultPass taosdata" >> $TAOS_CFG echo "defaultPass taosdata" >> $TAOS_CFG
echo "numOfLogLines 10000000" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG
echo "clog 2" >> $TAOS_CFG echo "clog 2" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG
...@@ -145,6 +145,6 @@ echo "tableIncStepPerVnode 10000" >> $TAOS_CFG ...@@ -145,6 +145,6 @@ echo "tableIncStepPerVnode 10000" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG
echo "numOfMnodes 1" >> $TAOS_CFG echo "numOfMnodes 1" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG
echo "anyIp 0" >> $TAOS_CFG echo "fsync 0" >> $TAOS_CFG
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册