提交 017299c0 编写于 作者: S Shengliang Guan

Merge from master

......@@ -172,10 +172,10 @@ IF (TD_WINDOWS)
ENDIF ()
IF (TD_MEMORY_SANITIZER)
MESSAGE("memory sanitizer detected as true")
MESSAGE("memory sanitizer detected as true")
SET(DEBUG_FLAGS "/fsanitize=address /Zi /W3 /GL")
ELSE ()
MESSAGE("memory sanitizer detected as false")
MESSAGE("memory sanitizer detected as false")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
ENDIF ()
SET(RELEASE_FLAGS "/W0 /O3 /GL")
......
......@@ -266,6 +266,7 @@ typedef struct SSqlObj {
typedef struct SSqlStream {
SSqlObj *pSql;
void * cqhandle; // stream belong to SCQContext handle
const char* dstTable;
uint32_t streamId;
char listed;
......
......@@ -468,6 +468,10 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int32_t cnt = 0;
int32_t j = 0;
if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) {
return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z);
}
for (uint32_t k = 1; k < sToken.n - 1; ++k) {
if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) {
tmpTokenBuf[j] = sToken.z[k + 1];
......@@ -711,7 +715,7 @@ static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char
}
code = TSDB_CODE_TSC_INVALID_OPERATION;
char tmpTokenBuf[16*1024] = {0}; // used for deleting Escape character: \\, \', \"
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
int32_t numOfRows = 0;
code = tsParseValues(str, dataBuf, maxNumOfRows, pInsertParam, &numOfRows, tmpTokenBuf);
......
......@@ -19,6 +19,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "taosmsg.h"
#include "tcq.h"
#include "taos.h"
......@@ -294,24 +295,34 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
return msgLen;
}
void tscKillConnection(STscObj *pObj) {
pthread_mutex_lock(&pObj->mutex);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
pSql = pSql->next;
// cqContext->dbconn is killed then call this callback
void cqConnKilledNotify(void* handle, void* conn) {
if (handle == NULL || conn == NULL){
return ;
}
SCqContext* pContext = (SCqContext*) handle;
if (pContext->dbConn == conn){
atomic_store_ptr(&(pContext->dbConn), NULL);
}
}
void tscKillConnection(STscObj *pObj) {
// get stream header by locked
pthread_mutex_lock(&pObj->mutex);
SSqlStream *pStream = pObj->streamList;
pthread_mutex_unlock(&pObj->mutex);
while (pStream) {
SSqlStream *tmp = pStream->next;
// set associate variant to NULL
cqConnKilledNotify(pStream->cqhandle, pObj);
// taos_close_stream function call pObj->mutet lock , careful death-lock
taos_close_stream(pStream);
pStream = tmp;
}
pthread_mutex_unlock(&pObj->mutex);
tscDebug("connection:%p is killed", pObj);
taos_close(pObj);
}
......@@ -438,7 +438,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
if (pzName->type == TK_STRING) {
pzName->n = strdequote(pzName->z);
}
strncpy(pCmd->payload, pzName->z, pzName->n);
} else { // drop user/account
if (pzName->n >= TSDB_USER_LEN) {
......@@ -516,7 +518,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SStrToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (id->type == TK_STRING) {
id->n = strdequote(id->z);
}
break;
}
......@@ -2158,6 +2162,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg7 = "normal table can not apply this function";
const char* msg8 = "multi-columns selection does not support alias column name";
const char* msg9 = "diff can no be applied to unsigned numeric type";
const char* msg10 = "parameter is out of range [1, 100]";
switch (functionId) {
case TSDB_FUNC_COUNT: {
......@@ -2551,7 +2556,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int64_t nTop = GET_INT32_VAL(val);
if (nTop <= 0 || nTop > 100) { // todo use macro
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10);
}
// todo REFACTOR
......@@ -7059,6 +7064,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// project query primary column must be timestamp type
if (tscIsProjectionQuery(pQueryInfo)) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
if (pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
......
......@@ -139,8 +139,13 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
// pSql == NULL maybe killStream already called
if(pSql == NULL) {
return ;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
tscDebug("0x%"PRIx64" add into timer", pSql->self);
if (pStream->isProject) {
/*
......@@ -339,8 +344,12 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
if (pStream->isProject) {
int64_t now = taosGetTimestamp(pStream->precision);
int64_t etime = now > pStream->etime ? pStream->etime : now;
int64_t maxRetent = tsMaxRetentWindow * 1000;
if(pStream->precision == TSDB_TIME_PRECISION_MICRO) {
maxRetent *= 1000;
}
if (pStream->etime < now && now - pStream->etime > tsMaxRetentWindow) {
if (pStream->etime < now && now - pStream->etime > maxRetent) {
/*
* current time window will be closed, since it too early to exceed the maxRetentWindow value
*/
......@@ -664,7 +673,7 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
}
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
......@@ -697,6 +706,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pStream->cqhandle = cqhandle;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
......@@ -745,7 +755,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
}
void taos_close_stream(TAOS_STREAM *handle) {
......
......@@ -831,6 +831,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "precision";
cfg.ptr = &tsTimePrecision;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_PRECISION;
cfg.maxValue = TSDB_MAX_PRECISION;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "comp";
cfg.ptr = &tsCompression;
cfg.valType = TAOS_CFG_VTYPE_INT8;
......@@ -901,6 +911,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "cachelast";
cfg.ptr = &tsCacheLastRow;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_DB_CACHE_LAST_ROW;
cfg.maxValue = TSDB_MAX_DB_CACHE_LAST_ROW;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttHostName";
cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING;
......
......@@ -74,7 +74,7 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
case TSDB_DATA_TYPE_BINARY: {
pVar->pz = strndup(token->z, token->n);
pVar->nLen = strdequote(pVar->pz);
pVar->nLen = strRmquote(pVar->pz, token->n);
break;
}
......
......@@ -177,6 +177,7 @@ public class TSDBResultSetTest {
rs.getAsciiStream("f1");
}
@SuppressWarnings("deprecation")
@Test(expected = SQLFeatureNotSupportedException.class)
public void getUnicodeStream() throws SQLException {
rs.getUnicodeStream("f1");
......
......@@ -38,21 +38,6 @@
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
typedef struct {
int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite;
struct SCqObj *pHead;
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
typedef struct SCqObj {
tmr_h tmrId;
......@@ -439,7 +424,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext;
......@@ -453,7 +438,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
INT64_MIN, (void *)pObj->rid, NULL, pContext);
// TODO the pObj->pStream may be released if error happens
if (pObj->pStream) {
......
......@@ -31,6 +31,23 @@ typedef struct {
FCqWrite cqWrite;
} SCqCfg;
// SCqContext
typedef struct {
int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite;
struct SCqObj *pHead;
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
// the following API shall be called by vnode
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
void cqClose(void *handle);
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
int32_t strdequote(char *src);
int32_t strRmquote(char *z, int32_t len);
size_t strtrim(char *src);
char * strnchr(char *haystack, char needle, int32_t len, bool skipquote);
char ** strsplit(char *src, const char *delim, int32_t *num);
......
......@@ -52,6 +52,36 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing
}
int32_t strRmquote(char *z, int32_t len){
// delete escape character: \\, \', \"
char delim = z[0];
if (delim != '\'' && delim != '\"') {
return len;
}
int32_t cnt = 0;
int32_t j = 0;
for (uint32_t k = 1; k < len - 1; ++k) {
if (z[k] == '\\' || (z[k] == delim && z[k + 1] == delim)) {
z[j] = z[k + 1];
cnt++;
j++;
k++;
continue;
}
z[j] = z[k];
j++;
}
z[j] = 0;
return len - 2 - cnt;
}
size_t strtrim(char *z) {
int32_t i = 0;
int32_t j = 0;
......
......@@ -21,7 +21,7 @@ def pre_test(){
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python/linux/python3/
pip3 install ${WKC}/src/connector/python/ || echo 0
'''
return 1
}
......
......@@ -23,6 +23,7 @@ class Node:
self.hostIP = hostIP
self.hostName = hostName
self.homeDir = homeDir
self.corePath = '/coredump'
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
def buildTaosd(self):
......@@ -127,21 +128,37 @@ class Node:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def detectCoredumpFile(self):
try:
result = self.conn.run("find /coredump -name 'core_*' ", hide=True)
output = result.stdout
print("output: %s" % output)
return output
except Exception as e:
print("find coredump file error on node %d " % self.index)
logging.exception(e)
class Nodes:
def __init__(self):
self.tdnodes = []
self.tdnodes.append(Node(0, 'root', '52.143.103.7', 'node1', 'a', '/root/'))
self.tdnodes.append(Node(1, 'root', '52.250.48.222', 'node2', 'a', '/root/'))
self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/'))
self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/'))
self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/'))
self.tdnodes.append(Node(0, 'root', '192.168.17.194', 'taosdata', 'r', '/root/'))
# self.tdnodes.append(Node(1, 'root', '52.250.48.222', 'node2', 'a', '/root/'))
# self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/'))
# self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/'))
# self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/'))
def stopOneNode(self, index):
self.tdnodes[index].stopTaosd()
self.tdnodes[index].forceStopOneTaosd()
def startOneNode(self, index):
self.tdnodes[index].startOneTaosd()
def detectCoredumpFile(self, index):
return self.tdnodes[index].detectCoredumpFile()
def stopAllTaosd(self):
for i in range(len(self.tdnodes)):
self.tdnodes[i].stopTaosd()
......@@ -166,14 +183,32 @@ class Nodes:
for i in range(len(self.tdnodes)):
self.tdnodes[i].removeData()
# kill taosd randomly every 10 mins
nodes = Nodes()
loop = 0
while True:
loop = loop + 1
class Test:
def __init__(self):
self.nodes = Nodes()
# kill taosd randomly every 10 mins
def randomlyKillDnode(self):
loop = 0
while True:
index = random.randint(0, 4)
print("loop: %d, kill taosd on node%d" %(loop, index))
nodes.stopOneNode(index)
self.nodes.stopOneNode(index)
time.sleep(60)
nodes.startOneNode(index)
self.nodes.startOneNode(index)
time.sleep(600)
loop = loop + 1
def detectCoredump(self):
loop = 0
while True:
for i in range(len(self.nodes.tdnodes)):
result = self.nodes.detectCoredumpFile(i)
print("core file path is %s" % result)
if result and not result.isspace():
self.nodes.stopAllTaosd()
print("sleep for 10 mins")
time.sleep(600)
test = Test()
test.detectCoredump()
\ No newline at end of file
......@@ -25,7 +25,7 @@ class TDTestCase:
def run(self):
tdSql.query("show variables")
tdSql.checkData(51, 1, 864000)
tdSql.checkData(53, 1, 864000)
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册