未验证 提交 485aecba 编写于 作者: T Tao Liu 提交者: GitHub

Merge branch 'master' into hotfix/blm_prometheus

...@@ -184,6 +184,7 @@ func main() { ...@@ -184,6 +184,7 @@ func main() {
} }
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
addr := strings.Split(r.RemoteAddr, ":") addr := strings.Split(r.RemoteAddr, ":")
idx := TAOShashID([]byte(addr[0])) idx := TAOShashID([]byte(addr[0]))
...@@ -192,7 +193,7 @@ func main() { ...@@ -192,7 +193,7 @@ func main() {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
r.Body.Close()
reqBuf, err := snappy.Decode(nil, compressed) reqBuf, err := snappy.Decode(nil, compressed)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
...@@ -205,7 +206,7 @@ func main() { ...@@ -205,7 +206,7 @@ func main() {
return return
} }
nodeChans[idx%httpworkers] <- req nodeChans[idx%httpworkers] <- req
w.WriteHeader(http.StatusAccepted)
}) })
http.HandleFunc("/check", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/check", func(w http.ResponseWriter, r *http.Request) {
......
...@@ -158,7 +158,7 @@ func main() { ...@@ -158,7 +158,7 @@ func main() {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
r.Body.Close()
var req Metrics var req Metrics
if err := json.Unmarshal(reqBuf, &req); err != nil { if err := json.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
...@@ -167,8 +167,9 @@ func main() { ...@@ -167,8 +167,9 @@ func main() {
req.HostIP = addr[0] req.HostIP = addr[0]
nodeChans[idx%httpworkers] <- req nodeChans[idx%httpworkers] <- req
r.Body.Close() r.Body.Close()
}) })
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tutil.h" #include "tutil.h"
#include "tnote.h" #include "tnote.h"
extern void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
...@@ -494,11 +495,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -494,11 +495,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if(pMeterMetaInfo->pMeterMeta == NULL) {
code = tscGetMeterMeta(pSql, pMeterMetaInfo); code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code; assert(code == TSDB_CODE_SUCCESS);
}
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL); assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
...@@ -554,13 +554,27 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -554,13 +554,27 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
if (pSql->pStream) { if (pSql->pStream) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/* /*
* NOTE: * NOTE:
* transfer the sql function for super table query before get meter/metric meta, * transfer the sql function for super table query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed! * since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/ */
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if ((UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)
&& ( pMeterMetaInfo->pMeterMeta == NULL
|| pMeterMetaInfo->pMetricMeta == NULL
|| pMeterMetaInfo->pMetricMeta->numOfMeters == 0
|| pMeterMetaInfo->pMetricMeta->numOfVnodes == 0))
|| (!(UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) && (pMeterMetaInfo->pMeterMeta == NULL))) {
tscTrace("%p stream:%p meta is updated, but no table, clear meter meta and set next launch new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
tscClearMeterMetaInfo(pMeterMetaInfo, false);
tscSetNextLaunchTimer(pSql->pStream, pSql);
return;
}
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream); tscIncStreamExecutionCount(pSql->pStream);
......
...@@ -3678,7 +3678,7 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c ...@@ -3678,7 +3678,7 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c
str++; str++;
int32_t ret = WCSPatternMatch(&patterStr[i], str, wcslen(str), pInfo); int32_t ret = WCSPatternMatch(&patterStr[i], str, twcslen(str), pInfo);
if (ret != TSDB_PATTERN_NOMATCH) { if (ret != TSDB_PATTERN_NOMATCH) {
return ret; return ret;
} }
......
...@@ -434,6 +434,19 @@ static void tscProcessServStatus(SSqlObj *pSql) { ...@@ -434,6 +434,19 @@ static void tscProcessServStatus(SSqlObj *pSql) {
if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) { if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL; pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return; return;
} else {
int32_t* data = (int32_t*) pObj->pHb->res.data;
if (data != NULL) {
int32_t totalDnode = data[0];
int32_t onlineDnode = data[1];
assert(onlineDnode <= totalDnode);
if (onlineDnode < totalDnode) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return;
}
}
} }
} else { } else {
if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) { if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
......
...@@ -2595,7 +2595,7 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn ...@@ -2595,7 +2595,7 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType); tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType);
size_t len = wcslen((wchar_t*)pColumnFilter->pz); size_t len = twcslen((wchar_t*)pColumnFilter->pz);
pColumnFilter->len = len * TSDB_NCHAR_SIZE; pColumnFilter->len = len * TSDB_NCHAR_SIZE;
} else { } else {
tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType); tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType);
...@@ -4657,6 +4657,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* ...@@ -4657,6 +4657,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
if (pMeterMetaInfo->pMeterMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfMeters == 0) { if (pMeterMetaInfo->pMeterMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfMeters == 0) {
tscTrace("%p no table in metricmeta, no output result", pSql); tscTrace("%p no table in metricmeta, no output result", pSql);
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
pSql->res.qhandle = 0x1; // to pass the qhandle check;
} }
// keep original limitation value in globalLimit // keep original limitation value in globalLimit
......
...@@ -231,13 +231,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -231,13 +231,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
idx += 1; idx += 1;
} }
} }
assert(idx >= pReducer->numOfBuffer);
if (idx == 0) { if (idx == 0) {
free(pReducer); free(pReducer);
return; return;
} }
pReducer->numOfBuffer = idx; pReducer->numOfBuffer = idx; // the actual entries that has result for merge
SCompareParam *param = malloc(sizeof(SCompareParam)); SCompareParam *param = malloc(sizeof(SCompareParam));
param->pLocalData = pReducer->pLocalDataSrc; param->pLocalData = pReducer->pLocalDataSrc;
......
...@@ -106,12 +106,12 @@ static int32_t tscGetMgmtConnMaxRetryTimes() { ...@@ -106,12 +106,12 @@ static int32_t tscGetMgmtConnMaxRetryTimes() {
return tscMgmtIpList.numOfIps * factor; return tscMgmtIpList.numOfIps * factor;
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { int32_t tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return TSDB_CODE_APP_ERROR;
if (pObj != pObj->signature) { if (pObj != pObj->signature) {
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
return; return TSDB_CODE_APP_ERROR;
} }
SSqlObj *pSql = pObj->pHb; SSqlObj *pSql = pObj->pHb;
...@@ -128,11 +128,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -128,11 +128,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId); if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId);
if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId); if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId);
} }
if (pRes->data == NULL) {
pRes->data = calloc(2, sizeof(int32_t));
}
((int32_t*)pRes->data)[0] = htonl(pRsp->totalDnodes);
((int32_t*)pRes->data)[1] = htonl(pRsp->onlineDnodes);
} else { } else {
tscTrace("heart beat failed, code:%d", code); tscTrace("heart beat failed, code:%d", code);
} }
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return code;
} }
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
......
...@@ -729,6 +729,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -729,6 +729,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
} }
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
#if 0
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -768,6 +769,11 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -768,6 +769,11 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
} }
return nRows; return nRows;
#endif
(*rows) = taos_fetch_row(res);
return ((*rows) != NULL)? 1:0;
} }
int taos_select_db(TAOS *taos, const char *db) { int taos_select_db(TAOS *taos, const char *db) {
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer); static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer);
static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) { static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) {
...@@ -97,6 +97,18 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { ...@@ -97,6 +97,18 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
return; return;
} }
if ((UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)
&& ( pMeterMetaInfo->pMeterMeta == NULL
|| pMeterMetaInfo->pMetricMeta == NULL
|| pMeterMetaInfo->pMetricMeta->numOfMeters == 0
|| pMeterMetaInfo->pMetricMeta->numOfVnodes == 0))
|| (!(UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) && (pMeterMetaInfo->pMeterMeta == NULL))) {
tscTrace("%p no table in metricmeta, no launch query", pSql);
tscClearMeterMetaInfo(pMeterMetaInfo, false);
tscSetNextLaunchTimer(pStream, pSql);
return;
}
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pMeterMetaInfo->name); tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pMeterMetaInfo->name);
tscProcessSql(pStream->pSql); tscProcessSql(pStream->pSql);
...@@ -323,7 +335,7 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { ...@@ -323,7 +335,7 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
} }
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
int64_t timer = 0; int64_t timer = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
......
...@@ -316,6 +316,9 @@ class CTaosInterface(object): ...@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields) blocks = [None] * len(fields)
for i in range(len(fields)): for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC: if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database") raise DatabaseError("Invalid data type returned from database")
......
...@@ -316,6 +316,9 @@ class CTaosInterface(object): ...@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields) blocks = [None] * len(fields)
for i in range(len(fields)): for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC: if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database") raise DatabaseError("Invalid data type returned from database")
......
...@@ -821,6 +821,8 @@ typedef struct { ...@@ -821,6 +821,8 @@ typedef struct {
typedef struct { typedef struct {
uint32_t queryId; uint32_t queryId;
uint32_t streamId; uint32_t streamId;
uint32_t totalDnodes;
uint32_t onlineDnodes;
char killConnection; char killConnection;
SIpList ipList; SIpList ipList;
} SHeartBeatRsp; } SHeartBeatRsp;
......
...@@ -130,6 +130,7 @@ extern "C" { ...@@ -130,6 +130,7 @@ extern "C" {
#define POW2(x) ((x) * (x)) #define POW2(x) ((x) * (x))
size_t twcslen(const wchar_t *wcs);
int32_t strdequote(char *src); int32_t strdequote(char *src);
void strtrim(char *src); void strtrim(char *src);
......
...@@ -1184,9 +1184,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por ...@@ -1184,9 +1184,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer); taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer);
} }
if (code == TSDB_CODE_ALREADY_PROCESSED) { if (code == TSDB_CODE_ALREADY_PROCESSED || code == TSDB_CODE_LAST_SESSION_NOT_FINISHED) {
tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label, tTrace("%s code:%d, cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label,
chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId), code, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId),
pHeader->tranId, pConn); pHeader->tranId, pConn);
free(data); free(data);
return pConn; return pConn;
......
...@@ -432,6 +432,8 @@ bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode); ...@@ -432,6 +432,8 @@ bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType); void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType); int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes);
extern int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); extern int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn); extern int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
......
...@@ -1185,7 +1185,7 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { ...@@ -1185,7 +1185,7 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
char * pStart, *pMsg; char * pStart, *pMsg;
int msgLen; int msgLen;
STaosRsp *pRsp; STaosRsp *pRsp;
mgmtSaveQueryStreamList(cont, contLen, pConn); mgmtSaveQueryStreamList(cont, contLen, pConn);
pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_HEARTBEAT_RSP, 128); pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_HEARTBEAT_RSP, 128);
...@@ -1203,6 +1203,10 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { ...@@ -1203,6 +1203,10 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
pConn->streamId = 0; pConn->streamId = 0;
pHBRsp->killConnection = pConn->killConnection; pHBRsp->killConnection = pConn->killConnection;
mgmtGetDnodeOnlineNum(&pHBRsp->totalDnodes, &pHBRsp->onlineDnodes);
pHBRsp->totalDnodes = htonl(pHBRsp->totalDnodes);
pHBRsp->onlineDnodes = htonl(pHBRsp->onlineDnodes);
if (pConn->usePublicIp) { if (pConn->usePublicIp) {
if (pSdbPublicIpList != NULL) { if (pSdbPublicIpList != NULL) {
int size = pSdbPublicIpList->numOfIps * 4; int size = pSdbPublicIpList->numOfIps * 4;
......
...@@ -577,7 +577,7 @@ static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { ...@@ -577,7 +577,7 @@ static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
const wchar_t* pattern = pRight; const wchar_t* pattern = pRight;
const wchar_t* str = pLeft; const wchar_t* str = pLeft;
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo); int32_t ret = WCSPatternMatch(pattern, str, twcslen(str), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
} }
......
...@@ -4648,7 +4648,7 @@ static void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t ...@@ -4648,7 +4648,7 @@ static void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t
len = t + 1 + TSDB_KEYSIZE; len = t + 1 + TSDB_KEYSIZE;
pCtx->param[index].pz = calloc(1, len); pCtx->param[index].pz = calloc(1, len);
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
t = wcslen((const wchar_t *)data); t = twcslen((const wchar_t *)data);
len = (t + 1) * TSDB_NCHAR_SIZE + TSDB_KEYSIZE; len = (t + 1) * TSDB_NCHAR_SIZE + TSDB_KEYSIZE;
pCtx->param[index].pz = calloc(1, len); pCtx->param[index].pz = calloc(1, len);
......
...@@ -45,4 +45,9 @@ int mgmtProcessDropAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -45,4 +45,9 @@ int mgmtProcessDropAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) {
int mgmtProcessCreateAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) { int mgmtProcessCreateAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
}
void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes) {
*totalDnodes = 1;
*onlineDnodes = 1;
} }
\ No newline at end of file
...@@ -41,11 +41,13 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -41,11 +41,13 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
char *tmpDir = "/tmp/"; char *tmpDir = "/tmp/";
#endif #endif
int64_t ts = taosGetTimestampUs();
strcpy(tmpPath, tmpDir); strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix); strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix); strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u"); strcat(tmpPath, "-%d-%llu-%u-%llu");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1)); snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, getpid(), taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1), ts);
} }
/* /*
......
...@@ -197,7 +197,7 @@ int32_t tVariantToString(tVariant *pVar, char *dst) { ...@@ -197,7 +197,7 @@ int32_t tVariantToString(tVariant *pVar, char *dst) {
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
dst[0] = '\''; dst[0] = '\'';
taosUcs4ToMbs(pVar->wpz, (wcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1); taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
int32_t len = strlen(dst); int32_t len = strlen(dst);
dst[len] = '\''; dst[len] = '\'';
dst[len + 1] = 0; dst[len + 1] = 0;
...@@ -430,7 +430,7 @@ static int32_t toNchar(tVariant *pVariant, char **pDest, int32_t *pDestSize) { ...@@ -430,7 +430,7 @@ static int32_t toNchar(tVariant *pVariant, char **pDest, int32_t *pDestSize) {
} }
pVariant->wpz = pWStr; pVariant->wpz = pWStr;
*pDestSize = wcslen(pVariant->wpz); *pDestSize = twcslen(pVariant->wpz);
// shrink the allocate memory, no need to check here. // shrink the allocate memory, no need to check here.
char* tmp = realloc(pVariant->wpz, (*pDestSize + 1)*TSDB_NCHAR_SIZE); char* tmp = realloc(pVariant->wpz, (*pDestSize + 1)*TSDB_NCHAR_SIZE);
......
...@@ -27,6 +27,23 @@ ...@@ -27,6 +27,23 @@
#include "tlog.h" #include "tlog.h"
#include "taoserror.h" #include "taoserror.h"
size_t twcslen(const wchar_t *wcs) {
int *wstr = (int *)wcs;
if (NULL == wstr) {
return 0;
}
size_t n = 0;
while (1) {
if (0 == *wstr++) {
break;
}
n++;
}
return n;
}
int32_t strdequote(char *z) { int32_t strdequote(char *z) {
if (z == NULL) { if (z == NULL) {
return 0; return 0;
......
char version[64] = "1.6.6.1"; char version[64] = "1.6.5.9";
char compatible_version[64] = "1.6.0.0"; char compatible_version[64] = "1.6.0.0";
char gitinfo[128] = "0b5b412ef0ae2449ece538601a29b899b2b727b9"; char gitinfo[128] = "0b5b412ef0ae2449ece538601a29b899b2b727b9";
char gitinfoOfInternal[128] = "8ae0d83a3610b9b4726373dd3073e4a8f444fb26"; char gitinfoOfInternal[128] = "8ae0d83a3610b9b4726373dd3073e4a8f444fb26";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册