提交 2105b033 编写于 作者: L lichuang

Merge branch 'develop' into feature/TD-3963

...@@ -283,6 +283,7 @@ typedef struct SSqlStream { ...@@ -283,6 +283,7 @@ typedef struct SSqlStream {
int64_t ctime; // stream created time int64_t ctime; // stream created time
int64_t stime; // stream next executed time int64_t stime; // stream next executed time
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
int64_t ltime; // stream last row time in stream table
SInterval interval; SInterval interval;
void * pTimer; void * pTimer;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "tutil.h" #include "tutil.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tscSubquery.h"
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
...@@ -47,8 +48,8 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) { ...@@ -47,8 +48,8 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) { static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) {
float retryRangeFactor = 0.3f; float retryRangeFactor = 0.3f;
int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor); int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor);
retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; retryDelta = ((rand() % retryDelta) + tsRetryStreamCompDelay) * 1000L;
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
// change to ms // change to ms
...@@ -575,6 +576,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -575,6 +576,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
// set stime with ltime if ltime > stime
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime);
pStream->stime = pStream->ltime;
}
int64_t starttime = tscGetLaunchTimestamp(pStream); int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
...@@ -590,7 +599,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { ...@@ -590,7 +599,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
pStream->dstTable = dstTable; pStream->dstTable = dstTable;
} }
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), // fetchFp call back
void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = res;
// get row data set to ltime
tscSetSqlOwner(pSql);
TAOS_ROW row = doSetResultRowData(pSql);
if( row && row[0] ) {
pStream->ltime = *((int64_t*)row[0]);
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime);
}
tscClearSqlOwner(pSql);
// no condition call
tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS);
taos_free_result(res);
}
// fp callback
void fpStreamLastRow(void* param ,TAOS_RES* res, int code) {
// check result successful
if (code != TSDB_CODE_SUCCESS) {
tscCreateStream(param, res, TSDB_CODE_SUCCESS);
taos_free_result(res);
return ;
}
// asynchronous fetch last row data
taos_fetch_rows_a(res, fetchFpStreamLastRow, param);
}
void cbParseSql(void* param, TAOS_RES* res, int code) {
// check result successful
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
SSqlCmd* pCmd = &pSql->cmd;
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL);
return;
}
// check dstTable valid
if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) {
tscDebug(" cbParseSql dstTable is empty.");
tscCreateStream(param, res, code);
return ;
}
// query stream last row time async
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
return ;
}
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL; if (pObj == NULL || pObj->signature != pObj) return NULL;
...@@ -613,11 +681,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -613,11 +681,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL; return NULL;
} }
pStream->stime = stime; pStream->ltime = INT64_MIN;
pStream->fp = fp; pStream->stime = stime;
pStream->fp = fp;
pStream->callback = callback; pStream->callback = callback;
pStream->param = param; pStream->param = param;
pStream->pSql = pSql; pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
tscSetStreamDestTable(pStream, dstTable);
pSql->pStream = pStream; pSql->pStream = pStream;
pSql->param = pStream; pSql->param = pStream;
...@@ -640,10 +713,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -640,10 +713,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pSql->fp = cbParseSql;
pSql->fetchFp = cbParseSql;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code); cbParseSql(pStream, pSql, code);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr);
} else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
free(pStream); free(pStream);
...@@ -653,6 +733,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -653,6 +733,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return pStream; return pStream;
} }
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
}
void taos_close_stream(TAOS_STREAM *handle) { void taos_close_stream(TAOS_STREAM *handle) {
SSqlStream *pStream = (SSqlStream *)handle; SSqlStream *pStream = (SSqlStream *)handle;
......
...@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting; ...@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
extern char tsEmail[]; extern char tsEmail[];
extern char tsArbitrator[]; extern char tsArbitrator[];
extern int8_t tsArbOnline; extern int8_t tsArbOnline;
extern int64_t tsArbOnlineTimestamp;
extern int32_t tsDnodeId; extern int32_t tsDnodeId;
// common // common
...@@ -75,7 +76,7 @@ extern int32_t tsMinSlidingTime; ...@@ -75,7 +76,7 @@ extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime; extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay; extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay; extern int32_t tsStreamCompStartDelay;
extern int32_t tsStreamCompRetryDelay; extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval; extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow; extern int64_t tsMaxRetentWindow;
......
...@@ -42,6 +42,7 @@ int32_t tsNumOfMnodes = 3; ...@@ -42,6 +42,7 @@ int32_t tsNumOfMnodes = 3;
int8_t tsEnableVnodeBak = 1; int8_t tsEnableVnodeBak = 1;
int8_t tsEnableTelemetryReporting = 1; int8_t tsEnableTelemetryReporting = 1;
int8_t tsArbOnline = 0; int8_t tsArbOnline = 0;
int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME;
char tsEmail[TSDB_FQDN_LEN] = {0}; char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0; int32_t tsDnodeId = 0;
...@@ -93,7 +94,7 @@ int32_t tsMaxStreamComputDelay = 20000; ...@@ -93,7 +94,7 @@ int32_t tsMaxStreamComputDelay = 20000;
int32_t tsStreamCompStartDelay = 10000; int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly // the stream computing delay time after executing failed, change accordingly
int32_t tsStreamCompRetryDelay = 10; int32_t tsRetryStreamCompDelay = 10*1000;
// The delayed computing ration. 10% of the whole computing time window by default. // The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f; float tsStreamComputDelayRatio = 0.1f;
...@@ -710,7 +711,7 @@ static void doInitGlobalConfig(void) { ...@@ -710,7 +711,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "retryStreamCompDelay"; cfg.option = "retryStreamCompDelay";
cfg.ptr = &tsStreamCompRetryDelay; cfg.ptr = &tsRetryStreamCompDelay;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 10; cfg.minValue = 10;
......
...@@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { ...@@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
taosReleaseRef(cqObjRef, (int64_t)param); taosReleaseRef(cqObjRef, (int64_t)param);
} }
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext; pObj->pContext = pContext;
...@@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = 0; pObj->tmrId = 0;
if (pObj->pStream == NULL) { if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
// TODO the pObj->pStream may be released if error happens // TODO the pObj->pStream may be released if error happens
if (pObj->pStream) { if (pObj->pStream) {
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
pContext->num++; pContext->num++;
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr); cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
} else { } else {
......
...@@ -375,6 +375,8 @@ do { \ ...@@ -375,6 +375,8 @@ do { \
#define TSDB_MAX_WAL_SIZE (1024*1024*3) #define TSDB_MAX_WAL_SIZE (1024*1024*3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
typedef enum { typedef enum {
TAOS_QTYPE_RPC = 0, TAOS_QTYPE_RPC = 0,
TAOS_QTYPE_FWD = 1, TAOS_QTYPE_FWD = 1,
......
...@@ -941,7 +941,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -941,7 +941,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = 0; *(int64_t *)pWrite = tsArbOnlineTimestamp;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -1150,7 +1150,12 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1150,7 +1150,12 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer->rid, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer->rid, connFd);
if (pPeer->isArb) tsArbOnline = 1; if (pPeer->isArb) {
tsArbOnline = 1;
if (tsArbOnlineTimestamp == TSDB_ARB_DUMMY_TIME) {
tsArbOnlineTimestamp = taosGetTimestampMs();
}
}
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
......
...@@ -432,7 +432,7 @@ class TDDnodes: ...@@ -432,7 +432,7 @@ class TDDnodes:
self.simDeployed = False self.simDeployed = False
def init(self, path): def init(self, path):
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID): while(processID):
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
...@@ -545,14 +545,14 @@ class TDDnodes: ...@@ -545,14 +545,14 @@ class TDDnodes:
for i in range(len(self.dnodes)): for i in range(len(self.dnodes)):
self.dnodes[i].stop() self.dnodes[i].stop()
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
if processID: if processID:
cmd = "sudo systemctl stop taosd" cmd = "sudo systemctl stop taosd"
os.system(cmd) os.system(cmd)
# if os.system(cmd) != 0 : # if os.system(cmd) != 0 :
# tdLog.exit(cmd) # tdLog.exit(cmd)
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID): while(processID):
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册