提交 1a8d66ce 编写于 作者: M Minglei Jin

Merge branch 'develop' into feature/TD-2502-v2

...@@ -265,8 +265,14 @@ function install_config() { ...@@ -265,8 +265,14 @@ function install_config() {
[ -f ${cfg_dir}/taos.cfg ] && ${csudo} cp ${cfg_dir}/taos.cfg ${cfg_install_dir} [ -f ${cfg_dir}/taos.cfg ] && ${csudo} cp ${cfg_dir}/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/* ${csudo} chmod 644 ${cfg_install_dir}/*
fi fi
# Save standard input to 6 and open / dev / TTY on standard input
exec 6<&0 0</dev/tty
local_fqdn_check local_fqdn_check
# restore the backup standard input, and turn off 6
exec 0<&6 6<&-
${csudo} mv ${cfg_dir}/taos.cfg ${cfg_dir}/taos.cfg.org ${csudo} mv ${cfg_dir}/taos.cfg ${cfg_dir}/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${cfg_dir} ${csudo} ln -s ${cfg_install_dir}/taos.cfg ${cfg_dir}
...@@ -422,7 +428,7 @@ function install_service() { ...@@ -422,7 +428,7 @@ function install_service() {
} }
function install_TDengine() { function install_TDengine() {
echo -e "${GREEN}Start to install TDEngine...${NC}" echo -e "${GREEN}Start to install TDengine...${NC}"
#install log and data dir , then ln to /usr/local/taos #install log and data dir , then ln to /usr/local/taos
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir} ${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
......
...@@ -119,4 +119,4 @@ if ((${service_mod}==2)); then ...@@ -119,4 +119,4 @@ if ((${service_mod}==2)); then
kill_taosd kill_taosd
fi fi
echo -e "${GREEN}TDEngine is removed successfully!${NC}" echo -e "${GREEN}TDengine is removed successfully!${NC}"
...@@ -2,19 +2,39 @@ ...@@ -2,19 +2,39 @@
# #
# This file is used to set config for core when taosd crash # This file is used to set config for core when taosd crash
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
set -e set -e
# set -x # set -x
corePath=$1
csudo="" csudo=""
if command -v sudo > /dev/null; then if command -v sudo > /dev/null; then
csudo="sudo" csudo="sudo"
fi fi
#ulimit -c unlimited if [[ ! -n ${corePath} ]]; then
echo -e -n "${GREEN}Please enter a file directory to save the coredump file${NC}:"
read corePath
while true; do
if [[ ! -z "$corePath" ]]; then
break
else
read -p "Please enter a file directory to save the coredump file:" corePath
fi
done
fi
ulimit -c unlimited
${csudo} sed -i '/ulimit -c unlimited/d' /etc/profile ||: ${csudo} sed -i '/ulimit -c unlimited/d' /etc/profile ||:
${csudo} sed -i '$a\ulimit -c unlimited' /etc/profile ||: ${csudo} sed -i '$a\ulimit -c unlimited' /etc/profile ||:
source /etc/profile source /etc/profile
${csudo} mkdir -p /coredump ||: ${csudo} mkdir -p ${corePath} ||:
${csudo} sysctl -w kernel.core_pattern='/coredump/core-%e-%p' ||: ${csudo} sysctl -w kernel.core_pattern=${corePath}/core-%e-%p ||:
${csudo} echo '/coredump/core-%e-%p' | ${csudo} tee /proc/sys/kernel/core_pattern ||: ${csudo} echo "${corePath}/core-%e-%p" | ${csudo} tee /proc/sys/kernel/core_pattern ||:
...@@ -330,7 +330,7 @@ void bnReset() { ...@@ -330,7 +330,7 @@ void bnReset() {
tsAccessSquence = 0; tsAccessSquence = 0;
} }
static int32_t bnMonitorVgroups() { static bool bnMonitorVgroups() {
void * pIter = NULL; void * pIter = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
bool hasUpdatingVgroup = false; bool hasUpdatingVgroup = false;
...@@ -489,6 +489,7 @@ void bnCheckStatus() { ...@@ -489,6 +489,7 @@ void bnCheckStatus() {
mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence, mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence,
pDnode->lastAccess, pDnode->status); pDnode->lastAccess, pDnode->status);
bnSetVgroupOffline(pDnode); bnSetVgroupOffline(pDnode);
bnStartTimer(3000);
} }
} }
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
......
...@@ -31,7 +31,10 @@ static void *bnThreadFunc(void *arg) { ...@@ -31,7 +31,10 @@ static void *bnThreadFunc(void *arg) {
} }
pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex); pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex);
mDebug("balance thread wakes up to work");
bool updateSoon = bnStart(); bool updateSoon = bnStart();
mDebug("balance thread finished this poll, updateSoon:%d", updateSoon);
bnStartTimer(updateSoon ? 1000 : -1); bnStartTimer(updateSoon ? 1000 : -1);
pthread_mutex_unlock(&(tsBnThread.mutex)); pthread_mutex_unlock(&(tsBnThread.mutex));
} }
...@@ -101,8 +104,8 @@ static void bnProcessTimer(void *handle, void *tmrId) { ...@@ -101,8 +104,8 @@ static void bnProcessTimer(void *handle, void *tmrId) {
tsBnThread.timer = NULL; tsBnThread.timer = NULL;
tsAccessSquence++; tsAccessSquence++;
bnCheckStatus();
bnStartTimer(-1); bnStartTimer(-1);
bnCheckStatus();
if (handle == NULL) { if (handle == NULL) {
if (tsAccessSquence % tsBalanceInterval == 0) { if (tsAccessSquence % tsBalanceInterval == 0) {
...@@ -121,6 +124,7 @@ void bnStartTimer(int64_t mseconds) { ...@@ -121,6 +124,7 @@ void bnStartTimer(int64_t mseconds) {
bool updateSoon = (mseconds != -1); bool updateSoon = (mseconds != -1);
if (updateSoon) { if (updateSoon) {
mTrace("balance function will be called after %" PRId64 " ms", mseconds);
taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer); taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer);
} else { } else {
taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer); taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer);
......
...@@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) { ...@@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
return; return;
} }
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){ if (pSql->fp == NULL || pSql->fetchFp == NULL){
return; return;
} }
......
...@@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) { ...@@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
} }
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
}
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) { if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (SAPercentileInfo*) pCtx->aOutputBuf; pInfo = (SAPercentileInfo*) pCtx->aOutputBuf;
} else { } else {
return GET_ROWCELL_INTERBUF(pResInfo); pInfo = GET_ROWCELL_INTERBUF(pResInfo);
} }
buildHistogramInfo(pInfo);
return pInfo;
} }
static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
...@@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { ...@@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true; return true;
} }
...@@ -2624,6 +2634,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { ...@@ -2624,6 +2634,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
assert(pInfo->pHisto->elems != NULL);
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i); char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
......
...@@ -911,6 +911,13 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, ...@@ -911,6 +911,13 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
} }
} }
if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) {
pRes->numOfRows = 0;
pBeforeFillData->num = 0;
pLocalReducer->discard = true;
return;
}
pRes->numOfRowsGroup += pRes->numOfRows; pRes->numOfRowsGroup += pRes->numOfRows;
// impose the limitation of output rows on the final result // impose the limitation of output rows on the final result
......
...@@ -284,16 +284,18 @@ void taos_close(TAOS *taos) { ...@@ -284,16 +284,18 @@ void taos_close(TAOS *taos) {
return; return;
} }
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); if (RID_VALID(pObj->hbrid)) {
if (pHb != NULL) { SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb->rpcRid > 0) { // wait for rsp from dnode if (pHb != NULL) {
rpcCancelRequest(pHb->rpcRid); if (RID_VALID(pHb->rpcRid)) { // wait for rsp from dnode
pHb->rpcRid = -1; rpcCancelRequest(pHb->rpcRid);
} pHb->rpcRid = -1;
}
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
taosReleaseRef(tscObjRef, pHb->self); taosReleaseRef(tscObjRef, pHb->self);
taos_free_result(pHb); taos_free_result(pHb);
}
} }
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
......
...@@ -101,7 +101,8 @@ extern int32_t tsAlternativeRole; ...@@ -101,7 +101,8 @@ extern int32_t tsAlternativeRole;
extern int32_t tsBalanceInterval; extern int32_t tsBalanceInterval;
extern int32_t tsOfflineThreshold; extern int32_t tsOfflineThreshold;
extern int32_t tsMnodeEqualVnodeNum; extern int32_t tsMnodeEqualVnodeNum;
extern int32_t tsFlowCtrl; extern int32_t tsEnableFlowCtrl;
extern int32_t tsEnableSlaveQuery;
// restful // restful
extern int32_t tsEnableHttpModule; extern int32_t tsEnableHttpModule;
......
...@@ -138,7 +138,8 @@ int32_t tsAlternativeRole = 0; ...@@ -138,7 +138,8 @@ int32_t tsAlternativeRole = 0;
int32_t tsBalanceInterval = 300; // seconds int32_t tsBalanceInterval = 300; // seconds
int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsOfflineThreshold = 86400*100; // seconds 10days
int32_t tsMnodeEqualVnodeNum = 4; int32_t tsMnodeEqualVnodeNum = 4;
int32_t tsFlowCtrl = 1; int32_t tsEnableFlowCtrl = 1;
int32_t tsEnableSlaveQuery = 1;
// restful // restful
int32_t tsEnableHttpModule = 1; int32_t tsEnableHttpModule = 1;
...@@ -221,7 +222,7 @@ int32_t uDebugFlag = 131; ...@@ -221,7 +222,7 @@ int32_t uDebugFlag = 131;
int32_t debugFlag = 0; int32_t debugFlag = 0;
int32_t sDebugFlag = 135; int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135; int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131; uint32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t (*monStartSystemFp)() = NULL; int32_t (*monStartSystemFp)() = NULL;
...@@ -542,7 +543,7 @@ static void doInitGlobalConfig(void) { ...@@ -542,7 +543,7 @@ static void doInitGlobalConfig(void) {
cfg.ptr = &tsOfflineThreshold; cfg.ptr = &tsOfflineThreshold;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 5; cfg.minValue = 3;
cfg.maxValue = 7200000; cfg.maxValue = 7200000;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_SECOND; cfg.unitType = TAOS_CFG_UTYPE_SECOND;
...@@ -1004,7 +1005,17 @@ static void doInitGlobalConfig(void) { ...@@ -1004,7 +1005,17 @@ static void doInitGlobalConfig(void) {
// module configs // module configs
cfg.option = "flowctrl"; cfg.option = "flowctrl";
cfg.ptr = &tsFlowCtrl; cfg.ptr = &tsEnableFlowCtrl;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "slaveQuery";
cfg.ptr = &tsEnableSlaveQuery;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0; cfg.minValue = 0;
......
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
...@@ -349,11 +349,12 @@ CTaosInterface.prototype.useResult = function useResult(result) { ...@@ -349,11 +349,12 @@ CTaosInterface.prototype.useResult = function useResult(result) {
return fields; return fields;
} }
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) { CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data //let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let num_of_rows = this.libtaos.taos_fetch_block(result, pblock) let pblock = this.libtaos.taos_fetch_row(result);
if (num_of_rows == 0) { if (pblock == null) {
return {block:null, num_of_rows:0}; return {block:null, num_of_rows:0};
} }
var fieldL = this.libtaos.taos_fetch_lengths(result); var fieldL = this.libtaos.taos_fetch_lengths(result);
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO); let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
...@@ -361,7 +362,6 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) { ...@@ -361,7 +362,6 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
var fieldlens = []; var fieldlens = [];
if (ref.isNull(fieldL) == false) { if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) { for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4); let plen = ref.reinterpret(fieldL, 4, i*4);
let len = plen.readInt32LE(0); let len = plen.readInt32LE(0);
......
...@@ -113,6 +113,7 @@ static void dnodeCleanupTmr() { ...@@ -113,6 +113,7 @@ static void dnodeCleanupTmr() {
int32_t dnodeInitSystem() { int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE); dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE);
tscEmbedded = 1; tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE(); taosBlockSIGPIPE();
taosResolveCRC(); taosResolveCRC();
taosInitGlobalCfg(); taosInitGlobalCfg();
...@@ -120,7 +121,6 @@ int32_t dnodeInitSystem() { ...@@ -120,7 +121,6 @@ int32_t dnodeInitSystem() {
taosSetCoreDump(); taosSetCoreDump();
taosInitNotes(); taosInitNotes();
dnodeInitTmr(); dnodeInitTmr();
signal(SIGPIPE, SIG_IGN);
if (dnodeCreateDir(tsLogDir) < 0) { if (dnodeCreateDir(tsLogDir) < 0) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
......
...@@ -54,6 +54,7 @@ void dnodeCleanupVRead() { ...@@ -54,6 +54,7 @@ void dnodeCleanupVRead() {
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
int32_t code = TSDB_CODE_VND_INVALID_VGROUP_ID;
char * pCont = pMsg->pCont; char * pCont = pMsg->pCont;
while (leftLen > 0) { while (leftLen > 0) {
...@@ -64,7 +65,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -64,7 +65,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
assert(pHead->contLen > 0); assert(pHead->contLen > 0);
void *pVnode = vnodeAcquire(pHead->vgId); void *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg); code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++; if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
...@@ -74,7 +75,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -74,7 +75,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
......
...@@ -188,6 +188,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -188,6 +188,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
int32_t numOfMsgs; int32_t numOfMsgs;
int32_t qtype; int32_t qtype;
taosBlockSIGPIPE();
dDebug("dnode vwrite worker:%d is running", pWorker->workerId); dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
while (1) { while (1) {
......
...@@ -28,7 +28,7 @@ extern "C" { ...@@ -28,7 +28,7 @@ extern "C" {
default: \ default: \
(_v) = (_finalType)GET_INT32_VAL(_data); \ (_v) = (_finalType)GET_INT32_VAL(_data); \
break; \ break; \
}; }
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -332,6 +332,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -332,6 +332,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break; break;
case 'N': case 'N':
arguments->data_batch = atoi(arg); arguments->data_batch = atoi(arg);
if (arguments->data_batch >= INT16_MAX) {
arguments->data_batch = INT16_MAX - 1;
}
break; break;
case 'L': case 'L':
{ {
......
...@@ -377,6 +377,24 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -377,6 +377,24 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
return code; return code;
} }
static bool mnodeAllOnline() {
void *pIter = NULL;
bool allOnline = true;
while (1) {
SMnodeObj *pMnode = NULL;
pIter = mnodeGetNextMnode(pIter, &pMnode);
if (pMnode == NULL) break;
if (pMnode->role != TAOS_SYNC_ROLE_MASTER && pMnode->role != TAOS_SYNC_ROLE_SLAVE) {
allOnline = false;
mnodeDecMnodeRef(pMnode);
}
}
mnodeCancelGetNextMnode(pIter);
return allOnline;
}
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId; pMnode->mnodeId = dnodeId;
...@@ -389,6 +407,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { ...@@ -389,6 +407,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
.fpRsp = mnodeCreateMnodeCb .fpRsp = mnodeCreateMnodeCb
}; };
if (needConfirm && !mnodeAllOnline()) {
mDebug("wait all mnode online then create new mnode");
return;
}
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (needConfirm) { if (needConfirm) {
code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp); code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
......
...@@ -1081,6 +1081,8 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1081,6 +1081,8 @@ static void *sdbWorkerFp(void *pWorker) {
int32_t qtype; int32_t qtype;
void * unUsed; void * unUsed;
taosBlockSIGPIPE();
while (1) { while (1) {
int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed); int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
......
...@@ -659,7 +659,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -659,7 +659,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pShow->bytes[cols] = 4; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "onlineVnodes"); strcpy(pSchema[cols].name, "onlines");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -674,13 +674,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -674,13 +674,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
for (int32_t i = 0; i < pShow->maxReplica; ++i) { for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2; pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dDnode", i + 1); snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dStatus", i + 1); snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
} }
......
...@@ -59,6 +59,7 @@ extern "C" { ...@@ -59,6 +59,7 @@ extern "C" {
// TAOS_OS_FUNC_SOCKET // TAOS_OS_FUNC_SOCKET
int32_t taosSetNonblocking(SOCKET sock, int32_t on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE();
void taosBlockSIGPIPE(); void taosBlockSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
......
...@@ -39,6 +39,10 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { ...@@ -39,6 +39,10 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0; return 0;
} }
void taosIgnSIGPIPE() {
signal(SIGPIPE, SIG_IGN);
}
void taosBlockSIGPIPE() { void taosBlockSIGPIPE() {
sigset_t signal_mask; sigset_t signal_mask;
sigemptyset(&signal_mask); sigemptyset(&signal_mask);
......
...@@ -46,6 +46,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { ...@@ -46,6 +46,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0; return 0;
} }
void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {} void taosBlockSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
......
...@@ -33,13 +33,6 @@ struct SColumnFilterElem; ...@@ -33,13 +33,6 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
int32_t pageId;
int32_t rowId;
} SGroupResInfo;
typedef struct SResultRowPool { typedef struct SResultRowPool {
int32_t elemSize; int32_t elemSize;
int32_t blockSize; int32_t blockSize;
...@@ -72,6 +65,12 @@ typedef struct SResultRow { ...@@ -72,6 +65,12 @@ typedef struct SResultRow {
union {STimeWindow win; char* key;}; // start key of current time window union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow; } SResultRow;
typedef struct SGroupResInfo {
int32_t rowId;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
} SGroupResInfo;
/** /**
* If the number of generated results is greater than this value, * If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate. * query query will be halt and return results to client immediate.
...@@ -89,7 +88,6 @@ typedef struct SResultRowInfo { ...@@ -89,7 +88,6 @@ typedef struct SResultRowInfo {
int32_t size:24; // number of result set int32_t size:24; // number of result set
int32_t capacity; // max capacity int32_t capacity; // max capacity
int32_t curIndex; // current start active index int32_t curIndex; // current start active index
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key int64_t prevSKey; // previous (not completed) sliding window start key
} SResultRowInfo; } SResultRowInfo;
......
...@@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto); ...@@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto);
void tHistogramPrint(SHistogramInfo* pHisto); void tHistogramPrint(SHistogramInfo* pHisto);
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val); int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHeapEntry* tHeapCreate(int32_t numOfEntries); SHeapEntry* tHeapCreate(int32_t numOfEntries);
void tHeapSort(SHeapEntry* pEntry, int32_t len); void tHeapSort(SHeapEntry* pEntry, int32_t len);
......
...@@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p); ...@@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p); int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p); int32_t getNumOfUsedResultRows(SResultRowPool* p);
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv);
bool isPointInterpoQuery(SQuery *pQuery); bool isPointInterpoQuery(SQuery *pQuery);
......
此差异已折叠。
...@@ -158,8 +158,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -158,8 +158,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
} }
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
int32_t idx = vnodeHistobinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val); int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries); assert(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL);
if ((*pHisto)->elems[idx].val == val && idx >= 0) { if ((*pHisto)->elems[idx].val == val && idx >= 0) {
(*pHisto)->elems[idx].num += 1; (*pHisto)->elems[idx].num += 1;
...@@ -356,7 +356,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -356,7 +356,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
return 0; return 0;
} }
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val) { int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t end = len - 1; int32_t end = len - 1;
int32_t start = 0; int32_t start = 0;
...@@ -466,7 +466,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) { ...@@ -466,7 +466,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) {
*/ */
int64_t tHistogramSum(SHistogramInfo* pHisto, double v) { int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
int32_t slotIdx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, v); int32_t slotIdx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, v);
if (pHisto->elems[slotIdx].val != v) { if (pHisto->elems[slotIdx].val != v) {
slotIdx -= 1; slotIdx -= 1;
......
...@@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo ...@@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
} }
...@@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow ...@@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
assert(num >= 0 && num <= numOfClosed); assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type; int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv); int64_t uid = 0;
char *key = NULL; char *key = NULL;
int16_t bytes = -1; int16_t bytes = -1;
...@@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { ...@@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]->closed) { SResultRow* pRow = pResultRowInfo->pResult[i];
if (pRow->closed) {
continue; continue;
} }
pResultRowInfo->pResult[i]->closed = true; pRow->closed = true;
} }
} }
...@@ -383,18 +382,4 @@ void* destroyResultRowPool(SResultRowPool* p) { ...@@ -383,18 +382,4 @@ void* destroyResultRowPool(SResultRowPool* p) {
tfree(p); tfree(p);
return NULL; return NULL;
}
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
if (!pRuntimeEnv->stableQuery) {
return 0; // for simple table query, the uid is always set to be 0;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
return 0;
}
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable);
return id->uid;
} }
\ No newline at end of file
...@@ -21,19 +21,19 @@ TEST(testCase, histogram_binary_search) { ...@@ -21,19 +21,19 @@ TEST(testCase, histogram_binary_search) {
pHisto->elems[i].val = i; pHisto->elems[i].val = i;
} }
int32_t idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 1); int32_t idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 1);
assert(idx == 1); assert(idx == 1);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 9); idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 9);
assert(idx == 9); assert(idx == 9);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 20); idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 20);
assert(idx == 10); assert(idx == 10);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, -1); idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, -1);
assert(idx == 0); assert(idx == 0);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 3.9); idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 3.9);
assert(idx == 4); assert(idx == 4);
free(pHisto); free(pHisto);
......
...@@ -38,7 +38,7 @@ extern "C" { ...@@ -38,7 +38,7 @@ extern "C" {
#define SYNC_MAX_FWDS 512 #define SYNC_MAX_FWDS 512
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms #define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1 // ms #define SYNC_CHECK_INTERVAL 1000 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms #define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
...@@ -86,9 +86,10 @@ typedef struct SsyncPeer { ...@@ -86,9 +86,10 @@ typedef struct SsyncPeer {
int32_t peerFd; // forward FD int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process int32_t fileChanged; // a flag to indicate file is changed during retrieving process
int32_t refCount;
int64_t rid;
void * timer; void * timer;
void * pConn; void * pConn;
int32_t refCount; // reference count
struct SSyncNode *pSyncNode; struct SSyncNode *pSyncNode;
} SSyncPeer; } SSyncPeer;
...@@ -98,6 +99,7 @@ typedef struct SSyncNode { ...@@ -98,6 +99,7 @@ typedef struct SSyncNode {
int8_t quorum; int8_t quorum;
int8_t selfIndex; int8_t selfIndex;
uint32_t vgId; uint32_t vgId;
int32_t refCount;
int64_t rid; int64_t rid;
SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator
SSyncPeer * pMaster; SSyncPeer * pMaster;
...@@ -121,13 +123,13 @@ extern int32_t tsSyncNum; ...@@ -121,13 +123,13 @@ extern int32_t tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN]; extern char tsNodeFqdn[TSDB_FQDN_LEN];
extern char * syncStatus[]; extern char * syncStatus[];
void *syncRetrieveData(void *param); void * syncRetrieveData(void *param);
void *syncRestoreData(void *param); void * syncRestoreData(void *param);
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void syncRestartConnection(SSyncPeer *pPeer); void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); SSyncPeer *syncAcquirePeer(int64_t rid);
int32_t syncDecPeerRef(SSyncPeer *pPeer); void syncReleasePeer(SSyncPeer *pPeer);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,14 +25,14 @@ typedef struct { ...@@ -25,14 +25,14 @@ typedef struct {
uint32_t serverIp; uint32_t serverIp;
int16_t port; int16_t port;
int32_t bufferSize; int32_t bufferSize;
void (*processBrokenLink)(void *ahandle); void (*processBrokenLink)(int64_t handleId);
int32_t (*processIncomingMsg)(void *ahandle, void *buffer); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void syncCloseTcpThreadPool(void *); void syncCloseTcpThreadPool(void *);
void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd); void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd);
void syncFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param); static void arbProcessBrokenLink(int64_t rid);
static int32_t arbProcessPeerMsg(void *param, void *buffer); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static void * tsArbTcpPool; static void * tsArbTcpPool;
...@@ -138,20 +138,20 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -138,20 +138,20 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id); sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd; pNode->nodeFd = connFd;
pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd); pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, (int64_t)pNode, connFd);
return; return;
} }
static void arbProcessBrokenLink(void *param) { static void arbProcessBrokenLink(int64_t rid) {
SNodeConn *pNode = param; SNodeConn *pNode = (SNodeConn *)rid;
sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno)); sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno));
tfree(pNode); tfree(pNode);
} }
static int32_t arbProcessPeerMsg(void *param, void *buffer) { static int32_t arbProcessPeerMsg(int64_t rid, void *buffer) {
SNodeConn *pNode = param; SNodeConn *pNode = (SNodeConn *)rid;
SSyncHead head; SSyncHead head;
int32_t bytes = 0; int32_t bytes = 0;
char * cont = (char *)buffer; char * cont = (char *)buffer;
......
此差异已折叠。
...@@ -90,15 +90,18 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -90,15 +90,18 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
break; break;
} }
sDebug("%s, file:%s info is received from master, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id,
minfo.name, minfo.index, minfo.size, minfo.fversion, minfo.magic);
// remove extra files on slave between the current and last index // remove extra files on slave between the current and last index
syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1); syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
pindex = minfo.index; pindex = minfo.index;
// check the file info // check the file info
sinfo = minfo; sinfo = minfo;
sDebug("%s, get file:%s info size:%" PRId64, pPeer->id, minfo.name, minfo.size); sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion);
sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, sDebug("%s, local file:%s info, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, sinfo.name,
&sinfo.fversion); sinfo.index, sinfo.size, sinfo.fversion, sinfo.magic);
// if file not there or magic is not the same, file shall be synced // if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(SFileAck)); memset(&fileAck, 0, sizeof(SFileAck));
...@@ -116,6 +119,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -116,6 +119,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
if (fileAck.sync == 0) { if (fileAck.sync == 0) {
sDebug("%s, %s is the same", pPeer->id, minfo.name); sDebug("%s, %s is the same", pPeer->id, minfo.name);
continue; continue;
} else {
sDebug("%s, %s will be received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
} }
// if sync is required, open file, receive from master, and write to file // if sync is required, open file, receive from master, and write to file
...@@ -155,7 +160,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -155,7 +160,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
return code; return code;
} }
static int32_t syncRestoreWal(SSyncPeer *pPeer) { static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret, code = -1; int32_t ret, code = -1;
uint64_t lastVer = 0; uint64_t lastVer = 0;
...@@ -198,6 +203,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -198,6 +203,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
} }
free(pHead); free(pHead);
*wver = lastVer;
return code; return code;
} }
...@@ -321,12 +327,19 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -321,12 +327,19 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
nodeVersion = fversion; nodeVersion = fversion;
sInfo("%s, start to restore wal", pPeer->id); sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
if (syncRestoreWal(pPeer) < 0) { uint64_t wver = 0;
sError("%s, failed to restore wal", pPeer->id); code = syncRestoreWal(pPeer, &wver); // lastwar
if (code < 0) {
sError("%s, failed to restore wal, code:%d", pPeer->id, code);
return -1; return -1;
} }
if (wver != 0) {
nodeVersion = wver;
sDebug("%s, restore wal finished, set sver:%" PRIu64, pPeer->id, nodeVersion);
}
nodeSStatus = TAOS_SYNC_STATUS_CACHE; nodeSStatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
if (syncProcessBufferedFwd(pPeer) < 0) { if (syncProcessBufferedFwd(pPeer) < 0) {
...@@ -338,7 +351,10 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -338,7 +351,10 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRestoreData(void *param) { void *syncRestoreData(void *param) {
SSyncPeer *pPeer = param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
...@@ -369,7 +385,7 @@ void *syncRestoreData(void *param) { ...@@ -369,7 +385,7 @@ void *syncRestoreData(void *param) {
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1); __sync_fetch_and_sub(&tsSyncNum, 1);
syncDecPeerRef(pPeer); syncReleasePeer(pPeer);
return NULL; return NULL;
} }
...@@ -104,7 +104,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -104,7 +104,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
syncBuildFileInfo(&fileInfo, pNode->vgId); syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, fileInfo.name,
fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic);
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
...@@ -144,6 +145,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -144,6 +145,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.index++; fileInfo.index++;
sDebug("%s, %s is the same", pPeer->id, fileInfo.name); sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
continue; continue;
} else {
sDebug("%s, %s will be sent", pPeer->id, fileInfo.name);
} }
// get the full path to file // get the full path to file
...@@ -461,7 +464,10 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -461,7 +464,10 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRetrieveData(void *param) { void *syncRetrieveData(void *param) {
SSyncPeer *pPeer = (SSyncPeer *)param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
...@@ -490,7 +496,7 @@ void *syncRetrieveData(void *param) { ...@@ -490,7 +496,7 @@ void *syncRetrieveData(void *param) {
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer); syncReleasePeer(pPeer);
return NULL; return NULL;
} }
...@@ -42,7 +42,7 @@ typedef struct SPoolObj { ...@@ -42,7 +42,7 @@ typedef struct SPoolObj {
typedef struct { typedef struct {
SThreadObj *pThread; SThreadObj *pThread;
void * ahandle; int64_t handleId;
int32_t fd; int32_t fd;
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
...@@ -112,7 +112,7 @@ void syncCloseTcpThreadPool(void *param) { ...@@ -112,7 +112,7 @@ void syncCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
...@@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
pConn->fd = connFd; pConn->fd = connFd;
pConn->pThread = pThread; pConn->pThread = pThread;
pConn->ahandle = pPeer; pConn->handleId = rid;
pConn->closedByApp = 0; pConn->closedByApp = 0;
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
...@@ -164,7 +164,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -164,7 +164,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
SPoolInfo * pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR); if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->ahandle); (*pInfo->processBrokenLink)(pConn->handleId);
pThread->numOfFds--; pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
...@@ -221,7 +221,7 @@ static void *syncProcessTcpData(void *param) { ...@@ -221,7 +221,7 @@ static void *syncProcessTcpData(void *param) {
} }
if (pConn->closedByApp == 0) { if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { if ((*pInfo->processIncomingMsg)(pConn->handleId, buffer) < 0) {
syncFreeTcpConn(pConn); syncFreeTcpConn(pConn);
continue; continue;
} }
......
...@@ -31,19 +31,19 @@ ...@@ -31,19 +31,19 @@
extern "C" { extern "C" {
#endif #endif
extern int tsdbDebugFlag; extern uint32_t tsdbDebugFlag;
#define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} #define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} #define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} #define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define TSDB_MAX_TABLE_SCHEMAS 16 #define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax))) #define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax)))
......
...@@ -161,6 +161,11 @@ _err: ...@@ -161,6 +161,11 @@ _err:
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
SMemTable *pIMem = pRepo->imem;
tsdbLockRepo(pRepo);
pRepo->imem = NULL;
tsdbUnlockRepo(pRepo);
tsdbUnRefMemTable(pRepo, pIMem);
sem_post(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit));
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable); static void tsdbFreeMemTable(SMemTable *pMemTable);
...@@ -205,7 +206,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -205,7 +206,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return 0; if (pRepo->mem == NULL) return 0;
SMemTable *pIMem = pRepo->imem; ASSERT(pRepo->imem == NULL);
sem_wait(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
...@@ -220,8 +221,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -220,8 +221,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsdbScheduleCommit(pRepo); tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
return 0; return 0;
} }
...@@ -606,19 +605,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -606,19 +605,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
STable * pTable = NULL; STable * pTable = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SDataRow row = NULL;
void ** rows = NULL; void * rows[TSDB_MAX_INSERT_BATCH] = {0};
int rowCounter = 0; int rowCounter = 0;
ASSERT(pBlock->tid < pMeta->maxTables); ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid]; pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
if (rows == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter); tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) { if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
...@@ -632,9 +625,18 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -632,9 +625,18 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
if (rows[rowCounter] != NULL) { if (rows[rowCounter] != NULL) {
rowCounter++; rowCounter++;
} }
if (rowCounter == TSDB_MAX_INSERT_BATCH) {
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
rowCounter = 0;
memset(rows, 0, sizeof(rows));
}
} }
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err; goto _err;
} }
...@@ -642,11 +644,9 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -642,11 +644,9 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
pRepo->stat.pointsWritten += points * schemaNCols(pSchema); pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema); pRepo->stat.totalStorage += points * schemaVLen(pSchema);
free(rows);
return 0; return 0;
_err: _err:
free(rows);
return -1; return -1;
} }
......
...@@ -956,9 +956,9 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl ...@@ -956,9 +956,9 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
return code; return code;
} }
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTsCol = pQueryHandle->rhelper.pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) { if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else { } else {
cur->pos = pBlock->numOfRows - 1; cur->pos = pBlock->numOfRows - 1;
} }
...@@ -1704,7 +1704,32 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1704,7 +1704,32 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) { static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists);
static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
while(1) {
int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getFirstFileDataBlock(pQueryHandle, exists);
} else { // next block of the same file
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
}
}
}
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0; pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
...@@ -1789,7 +1814,23 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex ...@@ -1789,7 +1814,23 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists); return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
}
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
assert(cur != NULL && numOfBlocks > 0);
return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}
static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0);
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
} }
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) { static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
...@@ -1800,14 +1841,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1800,14 +1841,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
if (!pQueryHandle->locateStart) { if (!pQueryHandle->locateStart) {
pQueryHandle->locateStart = true; pQueryHandle->locateStart = true;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
return getDataBlocksInFilesImpl(pQueryHandle, exists); return getFirstFileDataBlock(pQueryHandle, exists);
} else { } else {
// check if current file block is all consumed // check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
...@@ -1815,27 +1856,26 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1815,27 +1856,26 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// current block is done, try next // current block is done, try next
if ((!cur->mixBlock) || cur->blockCompleted) { if ((!cur->mixBlock) || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || // all data blocks in current file has been checked already, try next file if exists
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else {
// next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
}
} else { } else {
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo); tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos,
pQueryHandle->qinfo);
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0; *exists = (pQueryHandle->realNumOfRows > 0);
return code; if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
}
// current block is empty, try next block in file
// all data blocks in current file has been checked already, try next file if exists
if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
moveToNextDataBlockInCurrentFile(pQueryHandle);
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pQueryHandle, pNext, exists);
} }
} }
} }
......
...@@ -52,6 +52,8 @@ void *taosIterateRef(int rsetId, int64_t rid); ...@@ -52,6 +52,8 @@ void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system // return the number of references in system
int taosListRef(); int taosListRef();
#define RID_VALID(x) ((x) > 0)
/* sample code to iterate the refs /* sample code to iterate the refs
void demoIterateRefs(int rsetId) { void demoIterateRefs(int rsetId) {
......
...@@ -547,13 +547,14 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) { ...@@ -547,13 +547,14 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
return; return;
} }
__cache_wr_lock(pCacheObj);
STrashElem *pElem = calloc(1, sizeof(STrashElem)); STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode; pElem->pData = pNode;
pElem->prev = NULL; pElem->prev = NULL;
pElem->next = NULL;
pNode->inTrashcan = true; pNode->inTrashcan = true;
pNode->pTNodeHeader = pElem; pNode->pTNodeHeader = pElem;
__cache_wr_lock(pCacheObj);
pElem->next = pCacheObj->pTrash; pElem->next = pCacheObj->pTrash;
if (pCacheObj->pTrash) { if (pCacheObj->pTrash) {
pCacheObj->pTrash->prev = pElem; pCacheObj->pTrash->prev = pElem;
...@@ -563,8 +564,8 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) { ...@@ -563,8 +564,8 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pCacheObj->numOfElemsInTrash++; pCacheObj->numOfElemsInTrash++;
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
pNode->key, pNode->data, pElem, pCacheObj->numOfElemsInTrash); pNode->data, pElem, pCacheObj->numOfElemsInTrash);
} }
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
......
...@@ -280,10 +280,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) { ...@@ -280,10 +280,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
tSkipListRLock(pSkipList); tSkipListRLock(pSkipList);
if (iter->order == TSDB_ORDER_ASC) { if (iter->order == TSDB_ORDER_ASC) {
if (iter->cur == pSkipList->pTail) { // no data in the skip list
if (iter->cur == pSkipList->pTail || iter->next == NULL) {
iter->cur = pSkipList->pTail;
tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return false; return false;
} }
iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0); iter->cur = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it // a new node is inserted into between iter->cur and iter->next, ignore it
...@@ -295,9 +298,11 @@ bool tSkipListIterNext(SSkipListIterator *iter) { ...@@ -295,9 +298,11 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
iter->step++; iter->step++;
} else { } else {
if (iter->cur == pSkipList->pHead) { if (iter->cur == pSkipList->pHead) {
iter->cur = pSkipList->pHead;
tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return false; return false;
} }
iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0); iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it // a new node is inserted into between iter->cur and iter->next, ignore it
......
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
#ifndef SIGPIPE
#define SIGPIPE EPIPE
#endif
int32_t taosGetFqdn(char *fqdn) { int32_t taosGetFqdn(char *fqdn) {
char hostname[1024]; char hostname[1024];
hostname[1023] = '\0'; hostname[1023] = '\0';
...@@ -115,6 +119,10 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { ...@@ -115,6 +119,10 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
nleft -= nwritten; nleft -= nwritten;
ptr += nwritten; ptr += nwritten;
} }
if (errno == SIGPIPE || errno == EPIPE) {
return -1;
}
} }
return (nbytes - nleft); return (nbytes - nleft);
...@@ -142,6 +150,10 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { ...@@ -142,6 +150,10 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
nleft -= nread; nleft -= nread;
ptr += nread; ptr += nread;
} }
if (errno == SIGPIPE || errno == EPIPE) {
return -1;
}
} }
return (nbytes - nleft); return (nbytes - nleft);
......
...@@ -106,9 +106,10 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -106,9 +106,10 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion"); cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion");
if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) { if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; vnodeMsg.cfg.vgCfgVersion = 0;
} else {
vnodeMsg.cfg.vgCfgVersion = vgCfgVersion->valueint;
} }
vnodeMsg.cfg.vgCfgVersion = vgCfgVersion->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
......
...@@ -89,7 +89,10 @@ static void vnodeIncRef(void *ptNode) { ...@@ -89,7 +89,10 @@ static void vnodeIncRef(void *ptNode) {
} }
void *vnodeAcquire(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); SVnodeObj **ppVnode = NULL;
if (tsVnodesHash != NULL) {
ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
}
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
......
...@@ -65,13 +65,17 @@ static int32_t vnodeCheckRead(SVnodeObj *pVnode) { ...@@ -65,13 +65,17 @@ static int32_t vnodeCheckRead(SVnodeObj *pVnode) {
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, replica:%d role:%s, refCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica, return TSDB_CODE_SUCCESS;
syncRole[pVnode->role], pVnode->refCount, pVnode); }
return TSDB_CODE_APP_NOT_READY;
if (tsEnableSlaveQuery && pVnode->role == TAOS_SYNC_ROLE_SLAVE) {
return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_SUCCESS; vDebug("vgId:%d, replica:%d role:%s, refCount:%d pVnode:%p, cant provide query service", pVnode->vgId, pVnode->syncCfg.replica,
syncRole[pVnode->role], pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
} }
void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) { void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
...@@ -303,8 +307,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -303,8 +307,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code = TSDB_CODE_QRY_HAS_RSP; code = TSDB_CODE_QRY_HAS_RSP;
} else { } else {
void *h1 = qGetResultRetrieveMsg(*qhandle); //void *h1 = qGetResultRetrieveMsg(*qhandle);
assert(h1 == NULL);
/* remove this assert, one possible case that will cause h1 not NULL: query thread unlock pQInfo->lock, and then FETCH thread execute twice before query thread reach here */
//assert(h1 == NULL);
freehandle = qQueryCompleted(*qhandle); freehandle = qQueryCompleted(*qhandle);
} }
......
...@@ -297,7 +297,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { ...@@ -297,7 +297,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0; if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
if (tsFlowCtrl == 0) { if (tsEnableFlowCtrl == 0) {
int32_t ms = pow(2, pVnode->flowctrlLevel + 2); int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
if (ms > 100) ms = 100; if (ms > 100) ms = 100;
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms); vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms);
......
...@@ -50,12 +50,7 @@ pipeline { ...@@ -50,12 +50,7 @@ pipeline {
agent{label 'master'} agent{label 'master'}
steps { steps {
pre_test() pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
python3 concurrent_inquiry.py -c 1
'''
}
sh ''' sh '''
cd ${WKC}/tests cd ${WKC}/tests
./test-all.sh b1 ./test-all.sh b1
...@@ -82,53 +77,26 @@ pipeline { ...@@ -82,53 +77,26 @@ pipeline {
./handle_crash_gen_val_log.sh ./handle_crash_gen_val_log.sh
''' '''
} }
sh '''
cd ${WKC}/tests
./test-all.sh b2
date
'''
}
}
stage('test_valgrind') {
agent{label "186"}
steps {
pre_test()
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
date
cd ${WKC}/tests
./test-all.sh b3
date'''
}
}
stage('connector'){
agent{label "release"}
steps{
sh''' sh'''
cd ${WORKSPACE} systemctl start taosd
git checkout develop sleep 10
''' '''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh ''' sh '''
cd ${WORKSPACE}/tests/gotest cd ${WKC}/tests/gotest
bash batchtest.sh bash batchtest.sh
''' '''
} }
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh ''' sh '''
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker cd ${WKC}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py python3 PythonChecker.py
''' '''
} }
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh ''' sh '''
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/ cd ${WKC}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null mvn clean package assembly:single -DskipTests >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1 java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
''' '''
} }
...@@ -138,9 +106,41 @@ pipeline { ...@@ -138,9 +106,41 @@ pipeline {
dotnet run dotnet run
''' '''
} }
sh '''
systemctl stop taosd
cd ${WKC}/tests
./test-all.sh b2
date
'''
}
}
stage('test_valgrind') {
agent{label "186"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
nohup taosd >/dev/null &
sleep 10
python3 concurrent_inquiry.py -c 1
'''
}
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
} date
} cd ${WKC}/tests
./test-all.sh b3
date'''
}
}
stage('arm64_build'){ stage('arm64_build'){
agent{label 'arm64'} agent{label 'arm64'}
steps{ steps{
......
...@@ -388,7 +388,9 @@ class ConcurrentInquiry: ...@@ -388,7 +388,9 @@ class ConcurrentInquiry:
print( print(
"Failure thread%d, sql: %s \nexception: %s" % "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e))) (threadID, str(sql),str(e)))
#exit(-1) err_uec='Unable to establish connection'
if err_uec in str(e) and loop >0:
exit(-1)
loop -= 1 loop -= 1
if loop == 0: break if loop == 0: break
...@@ -415,7 +417,9 @@ class ConcurrentInquiry: ...@@ -415,7 +417,9 @@ class ConcurrentInquiry:
print( print(
"Failure thread%d, sql: %s \nexception: %s" % "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e))) (threadID, str(sql),str(e)))
#exit(-1) err_uec='Unable to establish connection'
if err_uec in str(e) and loop >0:
exit(-1)
loop -= 1 loop -= 1
if loop == 0: break if loop == 0: break
......
...@@ -2224,22 +2224,25 @@ class ClientManager: ...@@ -2224,22 +2224,25 @@ class ClientManager:
if svcMgr: # gConfig.auto_start_service: if svcMgr: # gConfig.auto_start_service:
svcMgr.stopTaosServices() svcMgr.stopTaosServices()
svcMgr = None svcMgr = None
# Print exec status, etc., AFTER showing messages from the server
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
# Linux return code: ref https://shapeshed.com/unix-exit-codes/
ret = 1 if self.tc.isFailed() else 0
self.tc.cleanup()
# Release global variables # Release global variables
gConfig = None gConfig = None
gSvcMgr = None gSvcMgr = None
logger = None logger = None
thPool = None
dbManager.cleanUp() # destructor wouldn't run in time
dbManager = None
# Print exec status, etc., AFTER showing messages from the server
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
# Linux return code: ref https://shapeshed.com/unix-exit-codes/
ret = 1 if self.tc.isFailed() else 0
self.tc.cleanup()
# Release variables here # Release variables here
self.tc = None self.tc = None
thPool = None
dbManager = None
gc.collect() # force garbage collection gc.collect() # force garbage collection
# h = hpy() # h = hpy()
......
...@@ -394,6 +394,7 @@ class DbManager(): ...@@ -394,6 +394,7 @@ class DbManager():
cType == 'native') else DbConn.createRest(dbTarget) cType == 'native') else DbConn.createRest(dbTarget)
try: try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
Logging.debug("DbManager opened DB connection...")
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if (err.msg == 'client disconnected'): # cannot open DB connection if (err.msg == 'client disconnected'): # cannot open DB connection
...@@ -412,6 +413,10 @@ class DbManager(): ...@@ -412,6 +413,10 @@ class DbManager():
# Moved to Database() # Moved to Database()
# self._stateMachine = StateMechine(self._dbConn) # self._stateMachine = StateMechine(self._dbConn)
def __del__(self):
''' Release the underlying DB connection upon deletion of DbManager '''
self.cleanUp()
def getDbConn(self): def getDbConn(self):
return self._dbConn return self._dbConn
...@@ -438,5 +443,8 @@ class DbManager(): ...@@ -438,5 +443,8 @@ class DbManager():
return "table_{}".format(tblNum) return "table_{}".format(tblNum)
def cleanUp(self): def cleanUp(self):
self._dbConn.close() if self._dbConn:
self._dbConn.close()
self._dbConn = None
Logging.debug("DbManager closed DB connection...")
...@@ -2,6 +2,7 @@ import threading ...@@ -2,6 +2,7 @@ import threading
import random import random
import logging import logging
import os import os
import sys
import taos import taos
...@@ -53,7 +54,7 @@ class Logging: ...@@ -53,7 +54,7 @@ class Logging:
# global misc.logger # global misc.logger
_logger = logging.getLogger('CrashGen') # real logger _logger = logging.getLogger('CrashGen') # real logger
_logger.addFilter(LoggingFilter()) _logger.addFilter(LoggingFilter())
ch = logging.StreamHandler() ch = logging.StreamHandler(sys.stdout) # Ref: https://stackoverflow.com/questions/14058453/making-python-loggers-output-all-messages-to-stdout-in-addition-to-log-file
_logger.addHandler(ch) _logger.addHandler(ch)
# Logging adapter, to be used as a logger # Logging adapter, to be used as a logger
......
...@@ -19,5 +19,5 @@ if __name__ == "__main__": ...@@ -19,5 +19,5 @@ if __name__ == "__main__":
mExec.init() mExec.init()
exitCode = mExec.run() exitCode = mExec.run()
print("Exiting with code: {}".format(exitCode)) print("\nCrash_Gen is now exiting with status code: {}".format(exitCode))
sys.exit(exitCode) sys.exit(exitCode)
...@@ -5,9 +5,10 @@ GREEN='\033[1;32m' ...@@ -5,9 +5,10 @@ GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m' GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m' GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
nohup /var/lib/jenkins/workspace/TDinternal/debug/build/bin/taosd -c /var/lib/jenkins/workspace/TDinternal/community/sim/dnode1/cfg >/dev/null & #nohup /var/lib/jenkins/workspace/TDinternal/debug/build/bin/taosd -c /var/lib/jenkins/workspace/TDinternal/community/sim/dnode1/cfg >/dev/null &
nohup /root/TDinternal/debug/build/bin/taosd -c /root/TDinternal/community/sim/dnode1/cfg >/dev/null &
./crash_gen.sh --valgrind -p -t 10 -s 250 -b 4 ./crash_gen.sh --valgrind -p -t 10 -s 250 -b 4
pidof taosd|xargs kill pidof taosd|xargs kill -9
grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log
for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'` for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'`
...@@ -31,4 +32,4 @@ if [ -n "$defiMemError" ]; then ...@@ -31,4 +32,4 @@ if [ -n "$defiMemError" ]; then
exit 8 exit 8
fi fi
fi fi
done done
\ No newline at end of file
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -59,7 +59,7 @@ class TDTestCase: ...@@ -59,7 +59,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -60,7 +60,7 @@ class TDTestCase: ...@@ -60,7 +60,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -60,7 +60,7 @@ class TDTestCase: ...@@ -60,7 +60,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -62,7 +62,7 @@ class TDTestCase: ...@@ -62,7 +62,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -59,7 +59,7 @@ class TDTestCase: ...@@ -59,7 +59,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -59,7 +59,7 @@ class TDTestCase: ...@@ -59,7 +59,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -61,7 +61,7 @@ class TDTestCase: ...@@ -61,7 +61,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -59,7 +59,7 @@ class TDTestCase: ...@@ -59,7 +59,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -32,7 +32,7 @@ class TDTestCase: ...@@ -32,7 +32,7 @@ class TDTestCase:
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.deploy(1) tdDnodes.deploy(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
...@@ -60,9 +60,9 @@ class TDTestCase: ...@@ -60,9 +60,9 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdLog.info("================= step5") tdLog.info("================= step5")
tdLog.info("import 10 data totally repetitive") tdLog.info("import 10 data totally repetitive")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -60,7 +60,7 @@ class TDTestCase: ...@@ -60,7 +60,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -57,9 +57,9 @@ class TDTestCase: ...@@ -57,9 +57,9 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdLog.info("================= step5") tdLog.info("================= step5")
tdLog.info("import 20 data later with partly overlap") tdLog.info("import 20 data later with partly overlap")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -62,7 +62,7 @@ class TDTestCase: ...@@ -62,7 +62,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -54,7 +54,7 @@ class TDTestCase: ...@@ -54,7 +54,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -61,7 +61,7 @@ class TDTestCase: ...@@ -61,7 +61,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdLog.info("import 100 sequential data again") tdLog.info("import 100 sequential data again")
......
...@@ -53,7 +53,7 @@ class TDTestCase: ...@@ -53,7 +53,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -53,7 +53,7 @@ class TDTestCase: ...@@ -53,7 +53,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -55,7 +55,7 @@ class TDTestCase: ...@@ -55,7 +55,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -53,7 +53,7 @@ class TDTestCase: ...@@ -53,7 +53,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -53,7 +53,7 @@ class TDTestCase: ...@@ -53,7 +53,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -59,7 +59,7 @@ class TDTestCase: ...@@ -59,7 +59,7 @@ class TDTestCase:
tdLog.info("================= step4") tdLog.info("================= step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdLog.sleep(5) #tdLog.sleep(5)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info("================= step5") tdLog.info("================= step5")
......
...@@ -64,7 +64,7 @@ class TDTestCase: ...@@ -64,7 +64,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -64,7 +64,7 @@ class TDTestCase: ...@@ -64,7 +64,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -64,7 +64,7 @@ class TDTestCase: ...@@ -64,7 +64,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -68,7 +68,7 @@ class TDTestCase: ...@@ -68,7 +68,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -61,7 +61,7 @@ class TDTestCase: ...@@ -61,7 +61,7 @@ class TDTestCase:
tdLog.info("================= step5") tdLog.info("================= step5")
tdDnodes.forcestop(1) tdDnodes.forcestop(1)
tdDnodes.start(1) tdDnodes.start(1)
tdLog.sleep(10) #tdLog.sleep(10)
tdLog.info("================= step6") tdLog.info("================= step6")
tdSql.query('select * from tb1') tdSql.query('select * from tb1')
......
...@@ -16,7 +16,7 @@ import taos ...@@ -16,7 +16,7 @@ import taos
from util.log import tdLog from util.log import tdLog
from util.cases import tdCases from util.cases import tdCases
from util.sql import tdSql from util.sql import tdSql
from util.dnodes import tdDnodes
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
...@@ -44,6 +44,25 @@ class TDTestCase: ...@@ -44,6 +44,25 @@ class TDTestCase:
tdSql.query("select * from db.st where ts='2020-05-13 10:00:00.000'") tdSql.query("select * from db.st where ts='2020-05-13 10:00:00.000'")
tdSql.checkRows(1) tdSql.checkRows(1)
## test case for https://jira.taosdata.com:18080/browse/TD-2488
tdSql.execute("create table m1(ts timestamp, k int) tags(a int)")
tdSql.execute("create table t1 using m1 tags(1)")
tdSql.execute("create table t2 using m1 tags(2)")
tdSql.execute("insert into t1 values('2020-1-1 1:1:1', 1)")
tdSql.execute("insert into t1 values('2020-1-1 1:10:1', 2)")
tdSql.execute("insert into t2 values('2020-1-1 1:5:1', 99)")
tdSql.query("select count(*) from m1 where ts = '2020-1-1 1:5:1' ")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.query("select count(*) from m1 where ts = '2020-1-1 1:5:1' ")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册