提交 a23a1d7b 编写于 作者: D dapan1121

Merge branch 'feature/TD-3950' of https://github.com/taosdata/TDengine into feature/TD-3950

...@@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "") ...@@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "")
SET(TD_LINUX TRUE) SET(TD_LINUX TRUE)
SET(TD_LINUX_64 FALSE) SET(TD_LINUX_64 FALSE)
SET(TD_ARM_64 TRUE) SET(TD_ARM_64 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "mips64")
SET(CPUTYPE "mips64")
MESSAGE(STATUS "Set CPUTYPE to mips64")
SET(TD_LINUX TRUE)
SET(TD_LINUX_64 FALSE)
SET(TD_MIPS_64 TRUE)
ENDIF () ENDIF ()
ELSE () ELSE ()
......
...@@ -307,7 +307,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild); ...@@ -307,7 +307,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta); uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize(); uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, void* buf); int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta); STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
......
...@@ -68,14 +68,16 @@ typedef struct CChildTableMeta { ...@@ -68,14 +68,16 @@ typedef struct CChildTableMeta {
int32_t vgId; int32_t vgId;
STableId id; STableId id;
uint8_t tableType; uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN]; //super table name, not full name char sTableName[TSDB_TABLE_FNAME_LEN]; // TODO: refactor super table name, not full name
uint64_t suid; // super table id
} CChildTableMeta; } CChildTableMeta;
typedef struct STableMeta { typedef struct STableMeta {
int32_t vgId; int32_t vgId;
STableId id; STableId id;
uint8_t tableType; uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN]; char sTableName[TSDB_TABLE_FNAME_LEN]; // super table name
uint64_t suid; // super table id
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
STableComInfo tableInfo; STableComInfo tableInfo;
......
...@@ -94,6 +94,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) { ...@@ -94,6 +94,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
pTableMeta->tableType = pTableMetaMsg->tableType; pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->vgId = pTableMetaMsg->vgroup.vgId; pTableMeta->vgId = pTableMetaMsg->vgroup.vgId;
pTableMeta->suid = pTableMetaMsg->suid;
pTableMeta->tableInfo = (STableComInfo) { pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags, .numOfTags = pTableMetaMsg->numOfTags,
......
...@@ -1832,8 +1832,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1832,8 +1832,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->tversion = htons(pMetaMsg->tversion);
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->uid = htobe64(pMetaMsg->uid);
pMetaMsg->suid = pMetaMsg->suid;
pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->contLen = htons(pMetaMsg->contLen);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
...@@ -2453,13 +2453,10 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { ...@@ -2453,13 +2453,10 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pTableMetaInfo->pTableMeta = (STableMeta *)tmp; pTableMetaInfo->pTableMeta = (STableMeta *)tmp;
}
memset(pTableMetaInfo->pTableMeta, 0, size); memset(pTableMetaInfo->pTableMeta, 0, size);
pTableMetaInfo->tableMetaSize = size; pTableMetaInfo->tableMetaSize = size;
} else {
//uint32_t s = tscGetTableMetaSize(pTableMetaInfo->pTableMeta);
memset(pTableMetaInfo->pTableMeta, 0, size);
pTableMetaInfo->tableMetaSize = size;
}
pTableMetaInfo->pTableMeta->tableType = -1; pTableMetaInfo->pTableMeta->tableType = -1;
pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1; pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1;
...@@ -2476,8 +2473,9 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { ...@@ -2476,8 +2473,9 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
STableMeta* pMeta = pTableMetaInfo->pTableMeta; STableMeta* pMeta = pTableMetaInfo->pTableMeta;
if (pMeta->id.uid > 0) { if (pMeta->id.uid > 0) {
// in case of child table, here only get the
if (pMeta->tableType == TSDB_CHILD_TABLE) { if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMetaInfo->pTableMeta, name, buf); int32_t code = tscCreateTableMetaFromSTableMeta(pTableMetaInfo->pTableMeta, name, buf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo); return getTableMetaFromMnode(pSql, pTableMetaInfo);
} }
......
...@@ -3370,22 +3370,25 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) { ...@@ -3370,22 +3370,25 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
CChildTableMeta* cMeta = calloc(1, sizeof(CChildTableMeta)); CChildTableMeta* cMeta = calloc(1, sizeof(CChildTableMeta));
cMeta->tableType = TSDB_CHILD_TABLE; cMeta->tableType = TSDB_CHILD_TABLE;
cMeta->vgId = pTableMeta->vgId; cMeta->vgId = pTableMeta->vgId;
cMeta->id = pTableMeta->id; cMeta->id = pTableMeta->id;
cMeta->suid = pTableMeta->suid;
tstrncpy(cMeta->sTableName, pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN); tstrncpy(cMeta->sTableName, pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
return cMeta; return cMeta;
} }
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, void* buf) { int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf) {
assert(pChild != NULL && buf != NULL); assert(pChild != NULL && buf != NULL);
// uint32_t size = tscGetTableMetaMaxSize(); STableMeta* p = buf;
STableMeta* p = buf;//calloc(1, size);
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1); taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1);
if (p->id.uid > 0) { // tableMeta exists, build child table meta and return
// tableMeta exists, build child table meta according to the super table meta
// the uid need to be checked in addition to the general name of the super table.
if (p->id.uid > 0 && pChild->suid == p->id.uid) {
pChild->sversion = p->sversion; pChild->sversion = p->sversion;
pChild->tversion = p->tversion; pChild->tversion = p->tversion;
...@@ -3393,13 +3396,9 @@ int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, v ...@@ -3393,13 +3396,9 @@ int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, v
int32_t total = pChild->tableInfo.numOfColumns + pChild->tableInfo.numOfTags; int32_t total = pChild->tableInfo.numOfColumns + pChild->tableInfo.numOfTags;
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total); memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
// tfree(p);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // super table has been removed, current tableMeta is also expired. remove it here } else { // super table has been removed, current tableMeta is also expired. remove it here
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
// tfree(p);
return -1; return -1;
} }
} }
......
...@@ -142,12 +142,15 @@ extern int32_t tsMonitorInterval; ...@@ -142,12 +142,15 @@ extern int32_t tsMonitorInterval;
extern int8_t tsEnableStream; extern int8_t tsEnableStream;
// internal // internal
extern int8_t tsCompactMnodeWal;
extern int8_t tsPrintAuth; extern int8_t tsPrintAuth;
extern int8_t tscEmbedded; extern int8_t tscEmbedded;
extern char configDir[]; extern char configDir[];
extern char tsVnodeDir[]; extern char tsVnodeDir[];
extern char tsDnodeDir[]; extern char tsDnodeDir[];
extern char tsMnodeDir[]; extern char tsMnodeDir[];
extern char tsMnodeBakDir[];
extern char tsMnodeTmpDir[];
extern char tsDataDir[]; extern char tsDataDir[];
extern char tsLogDir[]; extern char tsLogDir[];
extern char tsScriptDir[]; extern char tsScriptDir[];
......
...@@ -176,12 +176,15 @@ int32_t tsMonitorInterval = 30; // seconds ...@@ -176,12 +176,15 @@ int32_t tsMonitorInterval = 30; // seconds
int8_t tsEnableStream = 1; int8_t tsEnableStream = 1;
// internal // internal
int8_t tsCompactMnodeWal = 0;
int8_t tsPrintAuth = 0; int8_t tsPrintAuth = 0;
int8_t tscEmbedded = 0; int8_t tscEmbedded = 0;
char configDir[TSDB_FILENAME_LEN] = {0}; char configDir[TSDB_FILENAME_LEN] = {0};
char tsVnodeDir[TSDB_FILENAME_LEN] = {0}; char tsVnodeDir[TSDB_FILENAME_LEN] = {0};
char tsDnodeDir[TSDB_FILENAME_LEN] = {0}; char tsDnodeDir[TSDB_FILENAME_LEN] = {0};
char tsMnodeDir[TSDB_FILENAME_LEN] = {0}; char tsMnodeDir[TSDB_FILENAME_LEN] = {0};
char tsMnodeTmpDir[TSDB_FILENAME_LEN] = {0};
char tsMnodeBakDir[TSDB_FILENAME_LEN] = {0};
char tsDataDir[TSDB_FILENAME_LEN] = {0}; char tsDataDir[TSDB_FILENAME_LEN] = {0};
char tsScriptDir[TSDB_FILENAME_LEN] = {0}; char tsScriptDir[TSDB_FILENAME_LEN] = {0};
char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/"; char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/";
......
...@@ -211,6 +211,65 @@ public class TSDBPreparedStatementTest { ...@@ -211,6 +211,65 @@ public class TSDBPreparedStatementTest {
} }
} }
@Test
public void bindDataSelectColumnTest() throws SQLException {
Statement stmt = conn.createStatement();
int numOfRows = 1000;
for (int loop = 0; loop < 10; loop++){
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, f1 nchar(4), f2 float, f3 double, f4 timestamp, f5 int, f6 bool, f7 binary(10))");
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? (ts, f1, f7) values(?, ?, ?)");
Random r = new Random();
s.setTableName("weather_test");
ArrayList<Long> ts = new ArrayList<Long>();
for(int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
int random = 10 + r.nextInt(5);
ArrayList<String> s2 = new ArrayList<String>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
s2.add(null);
}else{
s2.add("分支" + i % 4);
}
}
s.setNString(1, s2, 4);
random = 10 + r.nextInt(5);
ArrayList<String> s5 = new ArrayList<String>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
s5.add(null);
}else{
s5.add("test" + i % 10);
}
}
s.setString(2, s5, 10);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
String sql = "select * from weather_test";
PreparedStatement statement = conn.prepareStatement(sql);
ResultSet rs = statement.executeQuery();
int rows = 0;
while(rs.next()) {
rows++;
}
Assert.assertEquals(numOfRows, rows);
}
}
@Test @Test
public void setBoolean() throws SQLException { public void setBoolean() throws SQLException {
pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis())); pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
......
const ref = require('ref-napi'); const ref = require('ref-napi');
require('./globalfunc.js') require('./globalfunc.js')
const CTaosInterface = require('./cinterface') const CTaosInterface = require('./cinterface')
const errors = require ('./error') const errors = require('./error')
const TaosQuery = require('./taosquery') const TaosQuery = require('./taosquery')
const { PerformanceObserver, performance } = require('perf_hooks'); const { PerformanceObserver, performance } = require('perf_hooks');
module.exports = TDengineCursor; module.exports = TDengineCursor;
...@@ -22,7 +22,7 @@ module.exports = TDengineCursor; ...@@ -22,7 +22,7 @@ module.exports = TDengineCursor;
* @property {fields} - Array of the field objects in order from left to right of the latest data retrieved * @property {fields} - Array of the field objects in order from left to right of the latest data retrieved
* @since 1.0.0 * @since 1.0.0
*/ */
function TDengineCursor(connection=null) { function TDengineCursor(connection = null) {
//All parameters are store for sync queries only. //All parameters are store for sync queries only.
this._rowcount = -1; this._rowcount = -1;
this._connection = null; this._connection = null;
...@@ -176,27 +176,22 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) { ...@@ -176,27 +176,22 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
throw new errors.OperationalError("Invalid use of fetchall, either result or fields from query are null. First execute a query first"); throw new errors.OperationalError("Invalid use of fetchall, either result or fields from query are null. First execute a query first");
} }
let data = []; let num_of_rows = this._chandle.affectedRows(this._result);
let data = new Array(num_of_rows);
this._rowcount = 0; this._rowcount = 0;
//let nodetime = 0;
let time = 0; let time = 0;
const obs = new PerformanceObserver((items) => { const obs = new PerformanceObserver((items) => {
time += items.getEntries()[0].duration; time += items.getEntries()[0].duration;
performance.clearMarks(); performance.clearMarks();
}); });
/*
const obs2 = new PerformanceObserver((items) => {
nodetime += items.getEntries()[0].duration;
performance.clearMarks();
});
obs2.observe({ entryTypes: ['measure'] });
performance.mark('nodea');
*/
obs.observe({ entryTypes: ['measure'] }); obs.observe({ entryTypes: ['measure'] });
performance.mark('A'); performance.mark('A');
while(true) { while (true) {
let blockAndRows = this._chandle.fetchBlock(this._result, this._fields); let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
// console.log(blockAndRows);
// break;
let block = blockAndRows.blocks; let block = blockAndRows.blocks;
let num_of_rows = blockAndRows.num_of_rows; let num_of_rows = blockAndRows.num_of_rows;
if (num_of_rows == 0) { if (num_of_rows == 0) {
...@@ -205,16 +200,18 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) { ...@@ -205,16 +200,18 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
this._rowcount += num_of_rows; this._rowcount += num_of_rows;
let numoffields = this._fields.length; let numoffields = this._fields.length;
for (let i = 0; i < num_of_rows; i++) { for (let i = 0; i < num_of_rows; i++) {
data.push([]); // data.push([]);
let rowBlock = new Array(numoffields); let rowBlock = new Array(numoffields);
for (let j = 0; j < numoffields; j++) { for (let j = 0; j < numoffields; j++) {
rowBlock[j] = block[j][i]; rowBlock[j] = block[j][i];
} }
data[data.length-1] = (rowBlock); data[this._rowcount - num_of_rows + i] = (rowBlock);
// data.push(rowBlock);
} }
} }
performance.mark('B'); performance.mark('B');
performance.measure('query', 'A', 'B'); performance.measure('query', 'A', 'B');
let response = this._createSetResponse(this._rowcount, time) let response = this._createSetResponse(this._rowcount, time)
...@@ -239,7 +236,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) { ...@@ -239,7 +236,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
* @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
* @since 1.0.0 * @since 1.0.0
*/ */
TDengineCursor.prototype.execute_a = function execute_a (operation, options, callback, param) { TDengineCursor.prototype.execute_a = function execute_a(operation, options, callback, param) {
if (operation == undefined) { if (operation == undefined) {
throw new errors.ProgrammingError('No operation passed as argument'); throw new errors.ProgrammingError('No operation passed as argument');
return null; return null;
...@@ -265,14 +262,14 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal ...@@ -265,14 +262,14 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal
} }
if (resCode >= 0) { if (resCode >= 0) {
// let fieldCount = cr._chandle.numFields(res2); // let fieldCount = cr._chandle.numFields(res2);
// if (fieldCount == 0) { // if (fieldCount == 0) {
// //cr._chandle.freeResult(res2); // //cr._chandle.freeResult(res2);
// return res2; // return res2;
// } // }
// else { // else {
// return res2; // return res2;
// } // }
return res2; return res2;
} }
...@@ -360,17 +357,17 @@ TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callb ...@@ -360,17 +357,17 @@ TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callb
for (let k = 0; k < fields.length; k++) { for (let k = 0; k < fields.length; k++) {
rowBlock[k] = block[k][j]; rowBlock[k] = block[k][j];
} }
data[data.length-1] = rowBlock; data[data.length - 1] = rowBlock;
} }
} }
cr._chandle.freeResult(result2); // free result, avoid seg faults and mem leaks! cr._chandle.freeResult(result2); // free result, avoid seg faults and mem leaks!
callback(param2, result2, numOfRows2, {data:data,fields:fields}); callback(param2, result2, numOfRows2, { data: data, fields: fields });
} }
} }
ref.writeObject(buf, 0, param); ref.writeObject(buf, 0, param);
param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param
return {param:param,result:result}; return { param: param, result: result };
} }
/** /**
* Stop a query given the result handle. * Stop a query given the result handle.
...@@ -428,7 +425,7 @@ TDengineCursor.prototype.subscribe = function subscribe(config) { ...@@ -428,7 +425,7 @@ TDengineCursor.prototype.subscribe = function subscribe(config) {
*/ */
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) { TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
while (true) { while (true) {
let { data, fields, result} = this._chandle.consume(subscription); let { data, fields, result } = this._chandle.consume(subscription);
callback(data, fields, result); callback(data, fields, result);
} }
} }
...@@ -450,7 +447,7 @@ TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) { ...@@ -450,7 +447,7 @@ TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
* @return {Buffer} A buffer pointing to the stream handle * @return {Buffer} A buffer pointing to the stream handle
* @since 1.3.0 * @since 1.3.0
*/ */
TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) { TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
let buf = ref.alloc('Object'); let buf = ref.alloc('Object');
ref.writeObject(buf, 0, param); ref.writeObject(buf, 0, param);
...@@ -463,17 +460,17 @@ TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) { ...@@ -463,17 +460,17 @@ TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
for (let k = 0; k < fields.length; k++) { for (let k = 0; k < fields.length; k++) {
rowBlock[k] = blocks[k][j]; rowBlock[k] = blocks[k][j];
} }
data[data.length-1] = rowBlock; data[data.length - 1] = rowBlock;
} }
callback(param2, result2, blocks, fields); callback(param2, result2, blocks, fields);
} }
return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf); return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf);
} }
/** /**
* Close a stream * Close a stream
* @param {Buffer} - A buffer pointing to the handle of the stream to be closed * @param {Buffer} - A buffer pointing to the handle of the stream to be closed
* @since 1.3.0 * @since 1.3.0
*/ */
TDengineCursor.prototype.closeStream = function closeStream(stream) { TDengineCursor.prototype.closeStream = function closeStream(stream) {
this._chandle.closeStream(stream); this._chandle.closeStream(stream);
} }
{
"name": "td2.0-connector",
"version": "2.0.6",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"array-index": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/array-index/-/array-index-1.0.0.tgz",
"integrity": "sha1-7FanSe4QPk4Ix5C5w1PfFgVbl/k=",
"requires": {
"debug": "^2.2.0",
"es6-symbol": "^3.0.2"
},
"dependencies": {
"debug": {
"version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"requires": {
"ms": "2.0.0"
}
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
}
}
},
"d": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz",
"integrity": "sha512-m62ShEObQ39CfralilEQRjH6oAMtNCV1xJyEx5LpRYUVN+EviphDgUc/F3hnYbADmkiNs67Y+3ylmlG7Lnu+FA==",
"requires": {
"es5-ext": "^0.10.50",
"type": "^1.0.1"
}
},
"debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"requires": {
"ms": "2.1.2"
}
},
"es5-ext": {
"version": "0.10.53",
"resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.53.tgz",
"integrity": "sha512-Xs2Stw6NiNHWypzRTY1MtaG/uJlwCk8kH81920ma8mvN8Xq1gsfhZvpkImLQArw8AHnv8MT2I45J3c0R8slE+Q==",
"requires": {
"es6-iterator": "~2.0.3",
"es6-symbol": "~3.1.3",
"next-tick": "~1.0.0"
}
},
"es6-iterator": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/es6-iterator/-/es6-iterator-2.0.3.tgz",
"integrity": "sha1-p96IkUGgWpSwhUQDstCg+/qY87c=",
"requires": {
"d": "1",
"es5-ext": "^0.10.35",
"es6-symbol": "^3.1.1"
}
},
"es6-symbol": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/es6-symbol/-/es6-symbol-3.1.3.tgz",
"integrity": "sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA==",
"requires": {
"d": "^1.0.1",
"ext": "^1.1.2"
}
},
"ext": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/ext/-/ext-1.4.0.tgz",
"integrity": "sha512-Key5NIsUxdqKg3vIsdw9dSuXpPCQ297y6wBjL30edxwPgt2E44WcWBZey/ZvUc6sERLTxKdyCu4gZFmUbk1Q7A==",
"requires": {
"type": "^2.0.0"
},
"dependencies": {
"type": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/type/-/type-2.1.0.tgz",
"integrity": "sha512-G9absDWvhAWCV2gmF1zKud3OyC61nZDwWvBL2DApaVFogI07CprggiQAOOjvp2NRjYWFzPyu7vwtDrQFq8jeSA=="
}
}
},
"ffi-napi": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/ffi-napi/-/ffi-napi-3.1.0.tgz",
"integrity": "sha512-EsHO+sP2p/nUC/3l/l8m9niee1BLm4asUFDzkkBGR4kYVgp2KqdAYUomZhkKtzim4Fq7mcYHjpUaIHsMqs+E1g==",
"requires": {
"debug": "^4.1.1",
"get-uv-event-loop-napi-h": "^1.0.5",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1",
"ref-napi": "^2.0.1",
"ref-struct-di": "^1.1.0"
},
"dependencies": {
"ref-napi": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-2.1.2.tgz",
"integrity": "sha512-aFl+vrIuLWUXMUTQGAwGAuSNLX3Ub5W3iVP8b7KyFFZUdn4+i4U1TXXTop0kCTUfGNu8glBGVz4lowkwMcPVVA==",
"requires": {
"debug": "^4.1.1",
"get-symbol-from-current-process-h": "^1.0.2",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
}
}
}
},
"get-symbol-from-current-process-h": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/get-symbol-from-current-process-h/-/get-symbol-from-current-process-h-1.0.2.tgz",
"integrity": "sha512-syloC6fsCt62ELLrr1VKBM1ggOpMdetX9hTrdW77UQdcApPHLmf7CI7OKcN1c9kYuNxKcDe4iJ4FY9sX3aw2xw=="
},
"get-uv-event-loop-napi-h": {
"version": "1.0.6",
"resolved": "https://registry.npmjs.org/get-uv-event-loop-napi-h/-/get-uv-event-loop-napi-h-1.0.6.tgz",
"integrity": "sha512-t5c9VNR84nRoF+eLiz6wFrEp1SE2Acg0wS+Ysa2zF0eROes+LzOfuTaVHxGy8AbS8rq7FHEJzjnCZo1BupwdJg==",
"requires": {
"get-symbol-from-current-process-h": "^1.0.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"next-tick": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/next-tick/-/next-tick-1.0.0.tgz",
"integrity": "sha1-yobR/ogoFpsBICCOPchCS524NCw="
},
"node-addon-api": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-2.0.2.tgz",
"integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA=="
},
"node-gyp-build": {
"version": "4.2.3",
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz",
"integrity": "sha512-MN6ZpzmfNCRM+3t57PTJHgHyw/h4OWnZ6mR8P5j/uZtqQr46RRuDE/P+g3n0YR/AiYXeWixZZzaip77gdICfRg=="
},
"ref-array-napi": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/ref-array-napi/-/ref-array-napi-1.2.1.tgz",
"integrity": "sha512-jQp2WWSucmxkqVfoNfm7yDlDeGu3liAbzqfwjNybL80ooLOCnCZpAK2woDInY+lxNOK/VlIVSqeDEYb4gVPuNQ==",
"requires": {
"array-index": "1",
"debug": "2",
"ref-napi": "^1.4.2"
},
"dependencies": {
"debug": {
"version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"requires": {
"ms": "2.0.0"
}
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
},
"ref-napi": {
"version": "1.5.2",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-1.5.2.tgz",
"integrity": "sha512-hwyNmWpUkt1bDWDW4aiwCoC+SJfJO69UIdjqssNqdaS0sYJpgqzosGg/rLtk69UoQ8drZdI9yyQefM7eEMM3Gw==",
"requires": {
"debug": "^3.1.0",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
},
"dependencies": {
"debug": {
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz",
"integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==",
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="
}
}
}
}
},
"ref-napi": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-3.0.1.tgz",
"integrity": "sha512-W3rcb0E+tlO9u9ySFnX5vifInwwPGToOfFgTZUHJBNiOBsW0NNvgHz2zJN7ctABo/2yIlgdPQUvuqqfORIF4LA==",
"requires": {
"debug": "^4.1.1",
"get-symbol-from-current-process-h": "^1.0.2",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
}
},
"ref-struct-di": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ref-struct-di/-/ref-struct-di-1.1.1.tgz",
"integrity": "sha512-2Xyn/0Qgz89VT+++WP0sTosdm9oeowLP23wRJYhG4BFdMUrLj3jhwHZNEytYNYgtPKLNTP3KJX4HEgBvM1/Y2g==",
"requires": {
"debug": "^3.1.0"
},
"dependencies": {
"debug": {
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz",
"integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==",
"requires": {
"ms": "^2.1.1"
}
}
}
},
"ref-struct-napi": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ref-struct-napi/-/ref-struct-napi-1.1.1.tgz",
"integrity": "sha512-YgS5/d7+kT5zgtySYI5ieH0hREdv+DabgDvoczxsui0f9VLm0rrDcWEj4DHKehsH+tJnVMsLwuyctWgvdEcVRw==",
"requires": {
"debug": "2",
"ref-napi": "^1.4.2"
},
"dependencies": {
"debug": {
"version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"requires": {
"ms": "2.0.0"
}
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
},
"ref-napi": {
"version": "1.5.2",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-1.5.2.tgz",
"integrity": "sha512-hwyNmWpUkt1bDWDW4aiwCoC+SJfJO69UIdjqssNqdaS0sYJpgqzosGg/rLtk69UoQ8drZdI9yyQefM7eEMM3Gw==",
"requires": {
"debug": "^3.1.0",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
},
"dependencies": {
"debug": {
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz",
"integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==",
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="
}
}
}
}
},
"type": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/type/-/type-1.2.0.tgz",
"integrity": "sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg=="
}
}
}
{ {
"name": "td2.0-connector", "name": "td2.0-connector",
"version": "2.0.6", "version": "2.0.7",
"description": "A Node.js connector for TDengine.", "description": "A Node.js connector for TDengine.",
"main": "tdengine.js", "main": "tdengine.js",
"directories": { "directories": {
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeTelemetry.h" #include "dnodeTelemetry.h"
#include "module.h" #include "module.h"
#include "mnode.h"
#if !defined(_MODULE) || !defined(_TD_LINUX) #if !defined(_MODULE) || !defined(_TD_LINUX)
int32_t moduleStart() { return 0; } int32_t moduleStart() { return 0; }
...@@ -216,6 +217,17 @@ static int32_t dnodeInitStorage() { ...@@ -216,6 +217,17 @@ static int32_t dnodeInitStorage() {
sprintf(tsDnodeDir, "%s/dnode", tsDataDir); sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
// sprintf(tsVnodeBakDir, "%s/vnode_bak", tsDataDir); // sprintf(tsVnodeBakDir, "%s/vnode_bak", tsDataDir);
if (tsCompactMnodeWal == 1) {
sprintf(tsMnodeTmpDir, "%s/mnode_tmp", tsDataDir);
tfsRmdir(tsMnodeTmpDir);
if (dnodeCreateDir(tsMnodeTmpDir) < 0) {
dError("failed to create dir: %s, reason: %s", tsMnodeTmpDir, strerror(errno));
return -1;
}
sprintf(tsMnodeBakDir, "%s/mnode_bak", tsDataDir);
//tfsRmdir(tsMnodeBakDir);
}
//TODO(dengyihao): no need to init here //TODO(dengyihao): no need to init here
if (dnodeCreateDir(tsMnodeDir) < 0) { if (dnodeCreateDir(tsMnodeDir) < 0) {
dError("failed to create dir: %s, reason: %s", tsMnodeDir, strerror(errno)); dError("failed to create dir: %s, reason: %s", tsMnodeDir, strerror(errno));
......
...@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) {
} }
} else if (strcmp(argv[i], "-C") == 0) { } else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1; dump_config = 1;
} else if (strcmp(argv[i], "--compact-mnode-wal") == 0) {
tsCompactMnodeWal = 1;
} else if (strcmp(argv[i], "-V") == 0) { } else if (strcmp(argv[i], "-V") == 0) {
#ifdef _ACCT #ifdef _ACCT
char *versionStr = "enterprise"; char *versionStr = "enterprise";
......
...@@ -73,6 +73,9 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg); ...@@ -73,6 +73,9 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg);
void mnodeProcessPeerRsp(SRpcMsg *pMsg); void mnodeProcessPeerRsp(SRpcMsg *pMsg);
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mnodeCompactWal();
int32_t mnodeCompactComponents();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -54,7 +54,8 @@ void monCleanupSystem(); ...@@ -54,7 +54,8 @@ void monCleanupSystem();
void monSaveAcctLog(SAcctMonitorObj *pMonObj); void monSaveAcctLog(SAcctMonitorObj *pMonObj);
void monSaveLog(int32_t level, const char *const format, ...); void monSaveLog(int32_t level, const char *const format, ...);
void monExecuteSQL(char *sql); void monExecuteSQL(char *sql);
typedef void (*MonExecuteSQLCbFP)(void *param, TAOS_RES *, int code);
void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -427,6 +427,9 @@ int32_t* taosGetErrno(); ...@@ -427,6 +427,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FS_INVLD_LEVEL TAOS_DEF_ERROR_CODE(0, 0x2207) //"tfs invalid level") #define TSDB_CODE_FS_INVLD_LEVEL TAOS_DEF_ERROR_CODE(0, 0x2207) //"tfs invalid level")
#define TSDB_CODE_FS_NO_VALID_DISK TAOS_DEF_ERROR_CODE(0, 0x2208) //"tfs no valid disk") #define TSDB_CODE_FS_NO_VALID_DISK TAOS_DEF_ERROR_CODE(0, 0x2208) //"tfs no valid disk")
// monitor
#define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection")
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
{ {
"filetype":"subscribe", "filetype": "subscribe",
"cfgdir": "/etc/taos", "cfgdir": "/etc/taos",
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 6030, "port": 6030,
......
此差异已折叠。
...@@ -35,6 +35,8 @@ void mnodeDropDbFromAcct(SAcctObj *pAcct, SDbObj *pDb); ...@@ -35,6 +35,8 @@ void mnodeDropDbFromAcct(SAcctObj *pAcct, SDbObj *pDb);
void mnodeAddUserToAcct(SAcctObj *pAcct, SUserObj *pUser); void mnodeAddUserToAcct(SAcctObj *pAcct, SUserObj *pUser);
void mnodeDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser); void mnodeDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser);
int32_t mnodeCompactAccts();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -25,6 +25,8 @@ void mnodeCleanupCluster(); ...@@ -25,6 +25,8 @@ void mnodeCleanupCluster();
void mnodeUpdateClusterId(); void mnodeUpdateClusterId();
const char* mnodeGetClusterId(); const char* mnodeGetClusterId();
int32_t mnodeCompactCluster();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -41,6 +41,8 @@ void mnodeDecDbRef(SDbObj *pDb); ...@@ -41,6 +41,8 @@ void mnodeDecDbRef(SDbObj *pDb);
bool mnodeCheckIsMonitorDB(char *db, char *monitordb); bool mnodeCheckIsMonitorDB(char *db, char *monitordb);
void mnodeDropAllDbs(SAcctObj *pAcct); void mnodeDropAllDbs(SAcctObj *pAcct);
int32_t mnodeCompactDbs();
// util func // util func
void mnodeAddSuperTableIntoDb(SDbObj *pDb); void mnodeAddSuperTableIntoDb(SDbObj *pDb);
void mnodeRemoveSuperTableFromDb(SDbObj *pDb); void mnodeRemoveSuperTableFromDb(SDbObj *pDb);
......
...@@ -77,6 +77,7 @@ void * mnodeGetDnodeByEp(char *ep); ...@@ -77,6 +77,7 @@ void * mnodeGetDnodeByEp(char *ep);
void mnodeUpdateDnode(SDnodeObj *pDnode); void mnodeUpdateDnode(SDnodeObj *pDnode);
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg); int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg);
int32_t mnodeCompactDnodes();
extern int32_t tsAccessSquence; extern int32_t tsAccessSquence;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -50,6 +50,7 @@ char* mnodeGetMnodeMasterEp(); ...@@ -50,6 +50,7 @@ char* mnodeGetMnodeMasterEp();
void mnodeGetMnodeInfos(void *mnodes); void mnodeGetMnodeInfos(void *mnodes);
void mnodeUpdateMnodeEpSet(SMInfos *pMnodes); void mnodeUpdateMnodeEpSet(SMInfos *pMnodes);
int32_t mnodeCompactMnodes();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -92,6 +92,7 @@ void sdbUpdateMnodeRoles(); ...@@ -92,6 +92,7 @@ void sdbUpdateMnodeRoles();
int32_t sdbGetReplicaNum(); int32_t sdbGetReplicaNum();
int32_t sdbInsertRow(SSdbRow *pRow); int32_t sdbInsertRow(SSdbRow *pRow);
int32_t sdbInsertCompactRow(SSdbRow *pRow);
int32_t sdbDeleteRow(SSdbRow *pRow); int32_t sdbDeleteRow(SSdbRow *pRow);
int32_t sdbUpdateRow(SSdbRow *pRow); int32_t sdbUpdateRow(SSdbRow *pRow);
int32_t sdbInsertRowToQueue(SSdbRow *pRow); int32_t sdbInsertRowToQueue(SSdbRow *pRow);
...@@ -106,6 +107,7 @@ int32_t sdbGetId(void *pTable); ...@@ -106,6 +107,7 @@ int32_t sdbGetId(void *pTable);
uint64_t sdbGetVersion(); uint64_t sdbGetVersion();
bool sdbCheckRowDeleted(void *pTable, void *pRow); bool sdbCheckRowDeleted(void *pTable, void *pRow);
int32_t mnodeCompactWal();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -36,6 +36,7 @@ void mnodeCancelGetNextSuperTable(void *pIter); ...@@ -36,6 +36,7 @@ void mnodeCancelGetNextSuperTable(void *pIter);
void mnodeDropAllChildTables(SDbObj *pDropDb); void mnodeDropAllChildTables(SDbObj *pDropDb);
void mnodeDropAllSuperTables(SDbObj *pDropDb); void mnodeDropAllSuperTables(SDbObj *pDropDb);
void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup); void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup);
int32_t mnodeCompactTables();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,6 +33,8 @@ char * mnodeGetUserFromMsg(void *pMnodeMsg); ...@@ -33,6 +33,8 @@ char * mnodeGetUserFromMsg(void *pMnodeMsg);
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg); int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg);
void mnodeDropAllUsers(SAcctObj *pAcct); void mnodeDropAllUsers(SAcctObj *pAcct);
int32_t mnodeCompactUsers();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -32,6 +32,7 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb); ...@@ -32,6 +32,7 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb);
void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb); void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb);
void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode); void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode);
//void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb); //void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb);
int32_t mnodeCompactVgroups();
void * mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup); void * mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup);
void mnodeCancelGetNextVgroup(void *pIter); void mnodeCancelGetNextVgroup(void *pIter);
......
...@@ -238,6 +238,32 @@ static int32_t mnodeCreateRootAcct() { ...@@ -238,6 +238,32 @@ static int32_t mnodeCreateRootAcct() {
return sdbInsertRow(&row); return sdbInsertRow(&row);
} }
int32_t mnodeCompactAccts() {
void *pIter = NULL;
SAcctObj *pAcct = NULL;
mInfo("start to compact accts table...");
while (1) {
pIter = mnodeGetNextAcct(pIter, &pAcct);
if (pAcct == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsAcctSdb,
.pObj = pAcct,
};
mInfo("compact accts %s", pAcct->user);
sdbInsertCompactRow(&row);
}
mInfo("end to compact accts table...");
return 0;
}
#ifndef _ACCT #ifndef _ACCT
int32_t acctInit() { return TSDB_CODE_SUCCESS; } int32_t acctInit() { return TSDB_CODE_SUCCESS; }
......
...@@ -237,3 +237,27 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, ...@@ -237,3 +237,27 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows,
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
int32_t mnodeCompactCluster() {
SClusterObj *pCluster = NULL;
void *pIter;
mInfo("start to compact cluster table...");
pIter = mnodeGetNextCluster(NULL, &pCluster);
while (pCluster) {
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsClusterSdb,
.pObj = pCluster,
};
sdbInsertCompactRow(&row);
pIter = mnodeGetNextCluster(pIter, &pCluster);
}
mInfo("end to compact cluster table...");
return 0;
}
\ No newline at end of file
...@@ -389,7 +389,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -389,7 +389,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->compression < 0) pCfg->compression = tsCompression; if (pCfg->compression < 0) pCfg->compression = tsCompression;
if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL; if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL;
if (pCfg->replications < 0) pCfg->replications = tsReplications; if (pCfg->replications < 0) pCfg->replications = tsReplications;
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum; if (pCfg->quorum < 0) pCfg->quorum = MIN(tsQuorum, pCfg->replications);
if (pCfg->update < 0) pCfg->update = tsUpdate; if (pCfg->update < 0) pCfg->update = tsUpdate;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
if (pCfg->dbType < 0) pCfg->dbType = 0; if (pCfg->dbType < 0) pCfg->dbType = 0;
...@@ -1271,3 +1271,30 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { ...@@ -1271,3 +1271,30 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs); mInfo("acct:%s, all dbs:%d is dropped from sdb", pAcct->user, numOfDbs);
} }
int32_t mnodeCompactDbs() {
void *pIter = NULL;
SDbObj *pDb = NULL;
mInfo("start to compact dbs table...");
while (1) {
pIter = mnodeGetNextDb(pIter, &pDb);
if (pDb == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb,
.pObj = pDb,
.rowSize = sizeof(SDbObj),
};
mInfo("compact dbs %s", pDb->name);
sdbInsertCompactRow(&row);
}
mInfo("end to compact dbs table...");
return 0;
}
\ No newline at end of file
...@@ -1270,3 +1270,30 @@ char* dnodeRoles[] = { ...@@ -1270,3 +1270,30 @@ char* dnodeRoles[] = {
"vnode", "vnode",
"any" "any"
}; };
int32_t mnodeCompactDnodes() {
SDnodeObj *pDnode = NULL;
void * pIter = NULL;
mInfo("start to compact dnodes table...");
while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDnodeSdb,
.pObj = pDnode,
.rowSize = sizeof(SDnodeObj),
};
mInfo("compact dnode %d", pDnode->dnodeId);
sdbInsertCompactRow(&row);
}
mInfo("end to compact dnodes table...");
return 0;
}
\ No newline at end of file
...@@ -57,6 +57,18 @@ static SStep tsMnodeSteps[] = { ...@@ -57,6 +57,18 @@ static SStep tsMnodeSteps[] = {
{"show", mnodeInitShow, mnodeCleanUpShow} {"show", mnodeInitShow, mnodeCleanUpShow}
}; };
static SStep tsMnodeCompactSteps[] = {
{"cluster", mnodeCompactCluster, NULL},
{"dnodes", mnodeCompactDnodes, NULL},
{"mnodes", mnodeCompactMnodes, NULL},
{"accts", mnodeCompactAccts, NULL},
{"users", mnodeCompactUsers, NULL},
{"dbs", mnodeCompactDbs, NULL},
{"vgroups", mnodeCompactVgroups, NULL},
{"tables", mnodeCompactTables, NULL},
};
static void mnodeInitTimer(); static void mnodeInitTimer();
static void mnodeCleanupTimer(); static void mnodeCleanupTimer();
static bool mnodeNeedStart() ; static bool mnodeNeedStart() ;
...@@ -71,6 +83,11 @@ static int32_t mnodeInitComponents() { ...@@ -71,6 +83,11 @@ static int32_t mnodeInitComponents() {
return dnodeStepInit(tsMnodeSteps, stepSize); return dnodeStepInit(tsMnodeSteps, stepSize);
} }
int32_t mnodeCompactComponents() {
int32_t stepSize = sizeof(tsMnodeCompactSteps) / sizeof(SStep);
return dnodeStepInit(tsMnodeCompactSteps, stepSize);
}
int32_t mnodeStartSystem() { int32_t mnodeStartSystem() {
if (tsMgmtIsRunning) { if (tsMgmtIsRunning) {
mInfo("mnode module already started..."); mInfo("mnode module already started...");
......
...@@ -566,3 +566,30 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -566,3 +566,30 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
return numOfRows; return numOfRows;
} }
int32_t mnodeCompactMnodes() {
void *pIter = NULL;
SMnodeObj *pMnode = NULL;
mInfo("start to compact mnodes table...");
while (1) {
pIter = mnodeGetNextMnode(pIter, &pMnode);
if (pMnode == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsMnodeSdb,
.pObj = pMnode,
.rowSize = sizeof(SMnodeObj),
};
mInfo("compact mnode %d", pMnode->mnodeId);
sdbInsertCompactRow(&row);
}
mInfo("end to compact mnodes table...");
return 0;
}
\ No newline at end of file
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tutil.h" #include "tutil.h"
#include "tref.h" #include "tref.h"
#include "tbn.h" #include "tbn.h"
#include "tfs.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
...@@ -450,6 +451,12 @@ int32_t sdbInit() { ...@@ -450,6 +451,12 @@ int32_t sdbInit() {
} }
tsSdbMgmt.status = SDB_STATUS_SERVING; tsSdbMgmt.status = SDB_STATUS_SERVING;
if (tsCompactMnodeWal) {
mnodeCompactWal();
exit(EXIT_SUCCESS);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -726,6 +733,12 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -726,6 +733,12 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
} }
} }
int32_t sdbInsertCompactRow(SSdbRow *pRow) {
SSdbTable *pTable = pRow->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
return sdbWriteRowToQueue(pRow, SDB_ACTION_INSERT);
}
int32_t sdbInsertRow(SSdbRow *pRow) { int32_t sdbInsertRow(SSdbRow *pRow) {
SSdbTable *pTable = pRow->pTable; SSdbTable *pTable = pRow->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
...@@ -1138,3 +1151,46 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1138,3 +1151,46 @@ static void *sdbWorkerFp(void *pWorker) {
int32_t sdbGetReplicaNum() { int32_t sdbGetReplicaNum() {
return tsSdbMgmt.cfg.replica; return tsSdbMgmt.cfg.replica;
} }
int32_t mnodeCompactWal() {
sdbInfo("vgId:1, start compact mnode wal...");
// close old wal
walFsync(tsSdbMgmt.wal, true);
walClose(tsSdbMgmt.wal);
// reset version,then compacted wal log can start from version 1
tsSdbMgmt.version = 0;
// change wal to wal_tmp dir
SWalCfg walCfg = {.vgId = 1, .walLevel = TAOS_WAL_FSYNC, .keep = TAOS_WAL_KEEP, .fsyncPeriod = 0};
char temp[TSDB_FILENAME_LEN] = {0};
sprintf(temp, "%s/wal", tsMnodeTmpDir);
tsSdbMgmt.wal = walOpen(temp, &walCfg);
walRenew(tsSdbMgmt.wal);
// compact memory tables info to wal tmp dir
if (mnodeCompactComponents() != 0) {
tfsRmdir(tsMnodeTmpDir);
return -1;
}
// close wal
walFsync(tsSdbMgmt.wal, true);
walClose(tsSdbMgmt.wal);
// rename old wal to wal_bak
if (taosRename(tsMnodeDir, tsMnodeBakDir) != 0) {
return -1;
}
// rename wal_tmp to wal
if (taosRename(tsMnodeTmpDir, tsMnodeDir) != 0) {
return -1;
}
// del wal_tmp dir
sdbInfo("vgId:1, compact mnode wal success");
return 0;
}
\ No newline at end of file
...@@ -3242,3 +3242,65 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3242,3 +3242,65 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
return numOfRows; return numOfRows;
} }
static int32_t mnodeCompactSuperTables() {
void *pIter = NULL;
SSTableObj *pTable = NULL;
mInfo("start to compact super table...");
while (1) {
pIter = mnodeGetNextSuperTable(pIter, &pTable);
if (pTable == NULL) break;
int32_t schemaSize = (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pTable,
.rowSize = sizeof(SSTableObj) + schemaSize,
};
mInfo("compact super %" PRIu64, pTable->uid);
sdbInsertCompactRow(&row);
}
mInfo("end to compact super table...");
return 0;
}
static int32_t mnodeCompactChildTables() {
void *pIter = NULL;
SCTableObj *pTable = NULL;
mInfo("start to compact child table...");
while (1) {
pIter = mnodeGetNextChildTable(pIter, &pTable);
if (pTable == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
};
mInfo("compact child %" PRIu64 ":%d", pTable->uid, pTable->tid);
sdbInsertCompactRow(&row);
}
mInfo("end to compact child table...");
return 0;
}
int32_t mnodeCompactTables() {
mnodeCompactSuperTables();
mnodeCompactChildTables();
return 0;
}
\ No newline at end of file
...@@ -617,3 +617,30 @@ static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg) { ...@@ -617,3 +617,30 @@ static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg) {
return mnodeRetriveAuth(pAuthMsg->user, &pAuthRsp->spi, &pAuthRsp->encrypt, pAuthRsp->secret, pAuthRsp->ckey); return mnodeRetriveAuth(pAuthMsg->user, &pAuthRsp->spi, &pAuthRsp->encrypt, pAuthRsp->secret, pAuthRsp->ckey);
} }
int32_t mnodeCompactUsers() {
void *pIter = NULL;
SUserObj *pUser = NULL;
mInfo("start to compact users table...");
while (1) {
pIter = mnodeGetNextUser(pIter, &pUser);
if (pUser == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsUserSdb,
.pObj = pUser,
.rowSize = sizeof(SUserObj),
};
mInfo("compact users %s", pUser->user);
sdbInsertCompactRow(&row);
}
mInfo("end to compact users table...");
return 0;
}
\ No newline at end of file
...@@ -1302,3 +1302,30 @@ void mnodeSetVgidVer(int8_t *cver, uint64_t iver) { ...@@ -1302,3 +1302,30 @@ void mnodeSetVgidVer(int8_t *cver, uint64_t iver) {
cver[1] = (int8_t)((int32_t)(iver % 100000) / 100); cver[1] = (int8_t)((int32_t)(iver % 100000) / 100);
cver[2] = (int8_t)(iver % 100); cver[2] = (int8_t)(iver % 100);
} }
int32_t mnodeCompactVgroups() {
void *pIter = NULL;
SVgObj *pVgroup = NULL;
mInfo("start to compact vgroups table...");
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
};
mInfo("compact vgroups %d", pVgroup->vgId);
sdbInsertCompactRow(&row);
}
mInfo("end to compact vgroups table...");
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_MIPS64_H
#define TDENGINE_OS_MIPS64_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
#include <stdlib.h>
#include <argp.h>
#include <arpa/inet.h>
#include <assert.h>
#include <ctype.h>
#include <dirent.h>
#include <endian.h>
#include <errno.h>
#include <float.h>
#include <ifaddrs.h>
#include <libgen.h>
#include <limits.h>
#include <locale.h>
#include <math.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <pthread.h>
#include <pwd.h>
#include <regex.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include <strings.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/file.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <syslog.h>
#include <termios.h>
#include <unistd.h>
#include <wchar.h>
#include <wordexp.h>
#include <wctype.h>
#include <inttypes.h>
#include <fcntl.h>
#include <sys/utsname.h>
#include <sys/resource.h>
#include <error.h>
#include <linux/sysctl.h>
#include <math.h>
#include <poll.h>
#ifdef __cplusplus
}
#endif
#endif
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
typedef void (*FLinuxSignalHandler)(int32_t signum, siginfo_t *sigInfo, void *context); typedef void (*FLinuxSignalHandler)(int32_t signum, siginfo_t *sigInfo, void *context);
void taosSetSignal(int32_t signum, FSignalHandler sigfp) { void taosSetSignal(int32_t signum, FSignalHandler sigfp) {
struct sigaction act = {{0}}; struct sigaction act; memset(&act, 0, sizeof(act));
#if 1 #if 1
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
act.sa_sigaction = (FLinuxSignalHandler)sigfp; act.sa_sigaction = (FLinuxSignalHandler)sigfp;
......
...@@ -417,3 +417,13 @@ void monExecuteSQL(char *sql) { ...@@ -417,3 +417,13 @@ void monExecuteSQL(char *sql) {
monDebug("execute sql:%s", sql); monDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql"); taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql");
} }
void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param) {
if (tsMonitor.conn == NULL) {
callback(param, NULL, TSDB_CODE_MON_CONNECTION_INVALID);
return;
}
monDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, callback, param);
}
...@@ -40,7 +40,7 @@ typedef struct SHeapEntry { ...@@ -40,7 +40,7 @@ typedef struct SHeapEntry {
} SHeapEntry; } SHeapEntry;
typedef struct SHistogramInfo { typedef struct SHistogramInfo {
int32_t numOfElems; int64_t numOfElems;
int32_t numOfEntries; int32_t numOfEntries;
int32_t maxEntries; int32_t maxEntries;
double min; double min;
......
...@@ -446,7 +446,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto) { ...@@ -446,7 +446,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto) {
} }
void tHistogramPrint(SHistogramInfo* pHisto) { void tHistogramPrint(SHistogramInfo* pHisto) {
printf("total entries: %d, elements: %d\n", pHisto->numOfEntries, pHisto->numOfElems); printf("total entries: %d, elements: %"PRId64 "\n", pHisto->numOfEntries, pHisto->numOfElems);
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
for (int32_t i = 0; i < pHisto->numOfEntries; ++i) { for (int32_t i = 0; i < pHisto->numOfEntries; ++i) {
printf("%d: (%f, %" PRId64 ")\n", i + 1, pHisto->elems[i].val, pHisto->elems[i].num); printf("%d: (%f, %" PRId64 ")\n", i + 1, pHisto->elems[i].val, pHisto->elems[i].num);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
3. This notice may not be removed or altered from any source distribution. 3. This notice may not be removed or altered from any source distribution.
*/ */
#ifndef _TD_ARM_ #if !defined(_TD_ARM_) && !defined(_TD_MIPS_)
#include <nmmintrin.h> #include <nmmintrin.h>
#endif #endif
......
...@@ -120,12 +120,14 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -120,12 +120,14 @@ int32_t vnodeDrop(int32_t vgId) {
vDebug("vgId:%d, failed to drop, vnode not find", vgId); vDebug("vgId:%d, failed to drop, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
} }
if (pVnode->dropped) {
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
}
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
pVnode->dropped = 1; pVnode->dropped = 1;
// remove from hash, so new messages wont be consumed
vnodeRemoveFromHash(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
vnodeCleanupInMWorker(pVnode); vnodeCleanupInMWorker(pVnode);
...@@ -425,6 +427,10 @@ int32_t vnodeOpen(int32_t vgId) { ...@@ -425,6 +427,10 @@ int32_t vnodeOpen(int32_t vgId) {
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return 0; if (pVnode == NULL) return 0;
if (pVnode->dropped) {
vnodeRelease(pVnode);
return 0;
}
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRemoveFromHash(pVnode); vnodeRemoveFromHash(pVnode);
...@@ -510,6 +516,8 @@ void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -510,6 +516,8 @@ void vnodeCleanUp(SVnodeObj *pVnode) {
vnodeSetClosingStatus(pVnode); vnodeSetClosingStatus(pVnode);
vnodeRemoveFromHash(pVnode);
// stop replication module // stop replication module
if (pVnode->sync > 0) { if (pVnode->sync > 0) {
int64_t sync = pVnode->sync; int64_t sync = pVnode->sync;
......
...@@ -117,14 +117,17 @@ static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t con ...@@ -117,14 +117,17 @@ static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t con
} }
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
if (pVnode->dropped) {
return TSDB_CODE_APP_NOT_READY;
}
SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam); SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam);
if (pRead == NULL) { if (pRead == NULL) {
assert(terrno != 0); assert(terrno != 0);
return terrno; return terrno;
} }
SVnodeObj *pVnode = vparam;
int32_t code = vnodeCheckRead(pVnode); int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pRead); taosFreeQitem(pRead);
......
...@@ -66,6 +66,9 @@ static bool vnodeSetClosingStatusImp(SVnodeObj* pVnode) { ...@@ -66,6 +66,9 @@ static bool vnodeSetClosingStatusImp(SVnodeObj* pVnode) {
} }
bool vnodeSetClosingStatus(SVnodeObj* pVnode) { bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return true;
while (!vnodeSetClosingStatusImp(pVnode)) { while (!vnodeSetClosingStatusImp(pVnode)) {
taosMsleep(1); taosMsleep(1);
} }
......
...@@ -55,6 +55,11 @@ void vnodeNotifyRole(int32_t vgId, int8_t role) { ...@@ -55,6 +55,11 @@ void vnodeNotifyRole(int32_t vgId, int8_t role) {
vTrace("vgId:%d, vnode not found while notify role", vgId); vTrace("vgId:%d, vnode not found while notify role", vgId);
return; return;
} }
if (pVnode->dropped) {
vTrace("vgId:%d, vnode dropped while notify role", vgId);
vnodeRelease(pVnode);
return;
}
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
pVnode->role = role; pVnode->role = role;
...@@ -75,6 +80,11 @@ void vnodeCtrlFlow(int32_t vgId, int32_t level) { ...@@ -75,6 +80,11 @@ void vnodeCtrlFlow(int32_t vgId, int32_t level) {
vTrace("vgId:%d, vnode not found while flow ctrl", vgId); vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
return; return;
} }
if (pVnode->dropped) {
vTrace("vgId:%d, vnode dropped while flow ctrl", vgId);
vnodeRelease(pVnode);
return;
}
if (pVnode->flowctrlLevel != level) { if (pVnode->flowctrlLevel != level) {
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level); vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
...@@ -129,6 +139,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpara ...@@ -129,6 +139,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpara
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId); vError("vgId:%d, vnode not found while write to cache", vgId);
vnodeRelease(pVnode);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
} }
......
...@@ -340,8 +340,11 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { ...@@ -340,8 +340,11 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
if (pWrite->processedCount >= 100) { if (pWrite->processedCount >= 100) {
vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code), vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code),
pWrite->processedCount); pWrite->processedCount);
pWrite->processedCount = 1; void *handle = pWrite->rpcMsg.handle;
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); taosFreeQitem(pWrite);
vnodeRelease(pVnode);
SRpcMsg rpcRsp = {.handle = handle, .code = code};
rpcSendResponse(&rpcRsp);
} else { } else {
code = vnodePerformFlowCtrl(pWrite); code = vnodePerformFlowCtrl(pWrite);
if (code == 0) { if (code == 0) {
...@@ -386,4 +389,6 @@ void vnodeWaitWriteCompleted(SVnodeObj *pVnode) { ...@@ -386,4 +389,6 @@ void vnodeWaitWriteCompleted(SVnodeObj *pVnode) {
vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg); vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
taosMsleep(10); taosMsleep(10);
} }
taosMsleep(900);
} }
...@@ -74,7 +74,7 @@ function runQueryPerfTest { ...@@ -74,7 +74,7 @@ function runQueryPerfTest {
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT #python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT
} }
......
...@@ -104,7 +104,7 @@ class taosdemoPerformace: ...@@ -104,7 +104,7 @@ class taosdemoPerformace:
return output return output
def insertData(self): def insertData(self):
os.system("taosdemo -f %s > taosdemoperf.txt" % self.generateJson()) os.system("taosdemo -f %s > taosdemoperf.txt 2>&1" % self.generateJson())
self.createTableTime = self.getCMDOutput("grep 'Spent' taosdemoperf.txt | awk 'NR==1{print $2}'") self.createTableTime = self.getCMDOutput("grep 'Spent' taosdemoperf.txt | awk 'NR==1{print $2}'")
self.insertRecordsTime = self.getCMDOutput("grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $2}'") self.insertRecordsTime = self.getCMDOutput("grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $2}'")
self.recordsPerSecond = self.getCMDOutput("grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $16}'") self.recordsPerSecond = self.getCMDOutput("grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $16}'")
......
...@@ -27,28 +27,28 @@ class TDTestCase: ...@@ -27,28 +27,28 @@ class TDTestCase:
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath): if "community" in selfPath:
projPath = selfPath[:selfPath.find("community")] projPath = selfPath[: selfPath.find("community")]
else: else:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[: selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if "taosd" in files:
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if "packaging" not in rootRealPath:
buildPath = root[:len(root)-len("/build/bin")] buildPath = root[: len(root) - len("/build/bin")]
break break
return buildPath return buildPath
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
buildPath = self.getBuildPath() buildPath = self.getBuildPath()
if (buildPath == ""): if buildPath == "":
tdLog.exit("taosd not found!") tdLog.exit("taosd not found!")
else: else:
tdLog.info("taosd found in %s" % buildPath) tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/" binPath = buildPath + "/build/bin/"
os.system("yes | %staosdemo -f tools/insert.json" % binPath) os.system("%staosdemo -f tools/insert.json -y" % binPath)
tdSql.execute("use db01") tdSql.execute("use db01")
tdSql.query("select count(*) from stb01") tdSql.query("select count(*) from stb01")
......
...@@ -68,7 +68,7 @@ while $loop <= $loops ...@@ -68,7 +68,7 @@ while $loop <= $loops
while $i < 10 while $i < 10
sql select count(*) from $stb where t1 = $i sql select count(*) from $stb where t1 = $i
if $data00 != $rowNum then if $data00 != $rowNum then
print expect $rowNum, actual: $data00 print expect $rowNum , actual: $data00
return -1 return -1
endi endi
$i = $i + 1 $i = $i + 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册