diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4cb2775ca24fbda4852635c4d349eaaf39c3c460..9123318d3bfa181bb4a21db12d6e8095c620fc4f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -237,9 +237,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; } else { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID || - rpcMsg->code == TSDB_CODE_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE || - rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) { + if (rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_INVALID_VGROUP_ID || + rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL) { /* * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized, * the virtual node may have not create table till now, so try again by using the new metermeta. diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 7534a324099fff01cf4ca47a1da51800ec3c3fb7..1ab14a4eae1758652f896436bbc40efc1133f10c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -579,9 +579,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_FETCH) && - (pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || - (pRes->code == TSDB_CODE_SUCCESS && pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) { + pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && + ((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || + (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows, diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4c8722ecea421d1509a99e7396b145dbb266b5d4..1925546222520e8714cf8cc46fc11ef6788e4eb6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1850,8 +1850,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { - STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); - // todo handle error + STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup assert(pTableMeta != NULL); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList); } else { // transfer the ownership of pTableMeta to the newly create sql object. diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 4d15dc5a863273ed3fda017ee63405a9b8fd6973..826f4ff1c1deaf4742f0b247de23ffa2c163ada9 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -22,7 +22,7 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); -void dnodeDispatchToDnodeMgmt(SRpcMsg *rpcMsg); +void dnodeDispatchToMgmtQueue(SRpcMsg *rpcMsg); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 14c2a725d921f0d7f8215e792daac948df7b71ce..7c457defca9dd822b018221deeeb4646ed919988 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -22,6 +22,7 @@ #include "ttimer.h" #include "tsdb.h" #include "twal.h" +#include "tqueue.h" #include "tsync.h" #include "ttime.h" #include "ttimer.h" @@ -42,10 +43,12 @@ void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; -static SRpcIpSet tsDMnodeIpSetForPeer = {0}; -static SRpcIpSet tsDMnodeIpSetForShell = {0}; +static SRpcIpSet tsDMnodeIpSet = {0}; static SDMMnodeInfos tsDMnodeInfos = {0}; static SDMDnodeCfg tsDnodeCfg = {0}; +static taos_qset tsMgmtQset = NULL; +static taos_queue tsMgmtQueue = NULL; +static pthread_t tsQthread; static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes); static bool dnodeReadMnodeInfos(); @@ -55,6 +58,7 @@ static bool dnodeReadDnodeCfg(); static void dnodeSaveDnodeCfg(); static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeSendStatusMsg(void *handle, void *tmrId); +static void *dnodeProcessMgmtQueue(void *param); static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); @@ -74,52 +78,64 @@ int32_t dnodeInitMgmt() { dnodeReadDnodeCfg(); tsRebootTime = taosGetTimestampSec(); - tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); - if (tsDnodeTmr == NULL) { - dError("failed to init dnode timer"); - return -1; - } - if (!dnodeReadMnodeInfos()) { - memset(&tsDMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); - memset(&tsDMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); + memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet)); memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); - tsDMnodeIpSetForPeer.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForPeer.fqdn[0], &tsDMnodeIpSetForPeer.port[0]); - tsDMnodeIpSetForPeer.port[0] += TSDB_PORT_DNODEDNODE; - - tsDMnodeIpSetForShell.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForShell.fqdn[0], &tsDMnodeIpSetForShell.port[0]); - tsDMnodeIpSetForShell.port[0] += TSDB_PORT_DNODESHELL; + tsDMnodeIpSet.numOfIps = 1; + taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]); if (strcmp(tsSecond, tsFirst) != 0) { - tsDMnodeIpSetForPeer.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForPeer.fqdn[1], &tsDMnodeIpSetForPeer.port[1]); - tsDMnodeIpSetForPeer.port[1] += TSDB_PORT_DNODEDNODE; - - tsDMnodeIpSetForShell.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForShell.fqdn[1], &tsDMnodeIpSetForShell.port[1]); - tsDMnodeIpSetForShell.port[1] += TSDB_PORT_DNODESHELL; + tsDMnodeIpSet.numOfIps = 2; + taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]); } } else { - tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]); - tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE; + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); } + } - tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]); - tsDMnodeIpSetForShell.port[i] += TSDB_PORT_DNODESHELL; - } + // create the queue and thread to handle the message + tsMgmtQset = taosOpenQset(); + if (tsMgmtQset == NULL) { + dError("failed to create the mgmt queue set"); + dnodeCleanupMgmt(); + return -1; + } + + tsMgmtQueue = taosOpenQueue(); + if (tsMgmtQueue == NULL) { + dError("failed to create the mgmt queue"); + dnodeCleanupMgmt(); + return -1; + } + + taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL); + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL); + pthread_attr_destroy(&thAttr); + if (code != 0) { + dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno)); + dnodeCleanupMgmt(); + return -1; } - int32_t code = dnodeOpenVnodes(); + code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { + dnodeCleanupMgmt(); + return -1; + } + + tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); + if (tsDnodeTmr == NULL) { + dError("failed to init dnode timer"); + dnodeCleanupMgmt(); return -1; } @@ -142,22 +158,62 @@ void dnodeCleanupMgmt() { } dnodeCloseVnodes(); + + if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); + if (tsQthread) pthread_join(tsQthread, NULL); + + if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue); + if (tsMgmtQset) taosCloseQset(tsMgmtQset); + tsMgmtQset = NULL; + tsMgmtQueue = NULL; + } -void dnodeDispatchToDnodeMgmt(SRpcMsg *pMsg) { - SRpcMsg rsp; +void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { + void *item; - if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { - rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); + item = taosAllocateQitem(sizeof(SRpcMsg)); + if (item) { + memcpy(item, pMsg, sizeof(SRpcMsg)); + taosWriteQitem(tsMgmtQueue, 1, item); } else { - rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; + SRpcMsg rsp; + rsp.handle = pMsg->handle; + rsp.pCont = NULL; + rsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); } +} + +static void *dnodeProcessMgmtQueue(void *param) { + SRpcMsg *pMsg; + SRpcMsg rsp; + int type; + void *handle; + + while (1) { + if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { + dTrace("dnode mgmt got no message from qset, exit ..."); + break; + } + + dTrace("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { + rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); + } else { + rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; + } + + rsp.handle = pMsg->handle; + rsp.pCont = NULL; + rpcSendResponse(&rsp); - rsp.handle = pMsg->handle; - rsp.pCont = NULL; - rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } - rpcFreeCont(pMsg->pCont); + return NULL; } static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { @@ -284,22 +340,26 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { } void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) { - dPrint("mnode IP list for peer is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); + dPrint("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); for (int i = 0; i < pIpSet->numOfIps; ++i) { + pIpSet->port[i] -= TSDB_PORT_DNODEDNODE; dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) } - tsDMnodeIpSetForPeer = *pIpSet; + tsDMnodeIpSet = *pIpSet; } void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) { SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSetForPeer; + *ipSet = tsDMnodeIpSet; + + for (int i=0; inumOfIps; ++i) + ipSet->port[i] += TSDB_PORT_DNODEDNODE; } void dnodeGetMnodeIpSetForShell(void *ipSetRaw) { SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSetForShell; + *ipSet = tsDMnodeIpSet; } static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { @@ -349,19 +409,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { dPrint("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); } - tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]); - tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE; - dPrint("mnode index:%d, for peer %s %d", i, tsDMnodeIpSetForPeer.fqdn[i], tsDMnodeIpSetForPeer.port[i]); - } - - tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]); - dPrint("mnode index:%d, for shell %s %d", i, tsDMnodeIpSetForShell.fqdn[i], tsDMnodeIpSetForShell.port[i]); + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); } dnodeSaveMnodeInfos(); @@ -487,7 +538,7 @@ static void dnodeSaveMnodeInfos() { } char *dnodeGetMnodeMasterEp() { - return tsDMnodeInfos.nodeInfos[tsDMnodeIpSetForPeer.inUse].nodeEp; + return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp; } void* dnodeGetMnodeInfos() { @@ -534,7 +585,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { .msgType = TSDB_MSG_TYPE_DM_STATUS }; - dnodeSendMsgToDnode(&tsDMnodeIpSetForPeer, &rpcMsg); + SRpcIpSet ipSet; + dnodeGetMnodeIpSetForPeer(&ipSet); + dnodeSendMsgToDnode(&ipSet, &rpcMsg); } static bool dnodeReadDnodeCfg() { diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 9a7b0837e8167a2a0b9452d1db47bf2c85a3fbee..ea3af08d7146706c7fed52a88055ebdbdb07aaef 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -43,10 +43,10 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 61e81dfeabeec110e7ae97a5e07e9e72882ef3ce..db910c9caea9331c888fc5181823682a91441fe7 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -93,16 +93,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_USER_FROM_CONN, 0, 0x0185, "can not get TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 0x0200, "table already exist") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_ID, 0, 0x0201, "invalid table id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 0x0202, "invalid table typee") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 0x0203, "invalid table name") -TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 0x0204, "no super table") // operation only available for super table -TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 0x0205, "not active table") -TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 0x0206, "table id mismatch") -TAOS_DEFINE_ERROR(TSDB_CODE_TAG_ALREAY_EXIST, 0, 0x0207, "tag already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_TAG_NOT_EXIST, 0, 0x0208, "tag not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_ALREAY_EXIST, 0, 0x0209, "field already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_NOT_EXIST, 0, 0x020A, "field not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_COL_NAME_TOO_LONG, 0, 0x020B, "column name too long") -TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TAGS, 0, 0x020C, "too many tags") +TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 0x0203, "no super table") // operation only available for super table +TAOS_DEFINE_ERROR(TSDB_CODE_TAG_ALREAY_EXIST, 0, 0x0204, "tag already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_TAG_NOT_EXIST, 0, 0x0205, "tag not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_ALREAY_EXIST, 0, 0x0206, "field already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_NOT_EXIST, 0, 0x0207, "field not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_COL_NAME_TOO_LONG, 0, 0x0209, "column name too long") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TAGS, 0, 0x0209, "too many tags") // dnode & mnode @@ -147,7 +144,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x038F, "grant cpu li // server TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 0x0400, "invalid vgroup id") -TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_VNODE, 0, 0x0401, "not active vnode") TAOS_DEFINE_ERROR(TSDB_CODE_VG_INIT_FAILED, 0, 0x0402, "vgroup init failed") TAOS_DEFINE_ERROR(TSDB_CODE_SERV_NO_DISKSPACE, 0, 0x0403, "server no diskspace") TAOS_DEFINE_ERROR(TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0x0404, "server out of memory") diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index aacf8f419ff445e7ec1d5ef4f6066a52336fbd9a..594fd3fd50b0adede5e5ff759a43e8f2ddba9ec2 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -220,6 +220,7 @@ typedef struct SAcctObj { typedef struct { int8_t type; + int32_t index; char db[TSDB_DB_NAME_LEN + 1]; void * pIter; int16_t numOfColumns; @@ -228,7 +229,6 @@ typedef struct { int32_t numOfReads; int16_t offset[TSDB_MAX_COLUMNS]; int16_t bytes[TSDB_MAX_COLUMNS]; - void * signature; uint16_t payloadLen; char payload[]; } SShowObj; diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index ac8eb73296ea3ffd85b6a285b30b0dd3dae48258..d61145d9b82ffa643a52c0e4feb6c75443079153 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -27,7 +27,8 @@ void mnodeCleanupVgroups(); SVgObj *mnodeGetVgroup(int32_t vgId); void mnodeIncVgroupRef(SVgObj *pVgroup); void mnodeDecVgroupRef(SVgObj *pVgroup); -void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg); +void mnodeDropAllDbVgroups(SDbObj *pDropDb); +void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb); void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode); void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index b904e06e977ef56cfb21480b05332a5502cdee2e..0ad835279e44d76eeaf8aa7ee9f2cdc551bdda54 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -81,10 +81,10 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) { SDbObj *pDb = pOper->pObj; SAcctObj *pAcct = mnodeGetAcct(pDb->acct); - mnodeDropDbFromAcct(pAcct, pDb); mnodeDropAllChildTables(pDb); mnodeDropAllSuperTables(pDb); - mnodeDropAllDbVgroups(pDb, false); + mnodeDropAllDbVgroups(pDb); + mnodeDropDbFromAcct(pAcct, pDb); mnodeDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; @@ -998,19 +998,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { return code; } -#if 1 - mnodeDropAllDbVgroups(pMsg->pDb, true); -#else - SVgObj *pVgroup = pMsg->pDb->pHead; - if (pVgroup != NULL) { - mPrint("vgId:%d, will be dropped", pVgroup->vgId); - SMnodeMsg *newMsg = mnodeCloneMsg(pMsg); - newMsg->ahandle = pVgroup; - newMsg->expected = pVgroup->numOfVnodes; - mnodeDropVgroup(pVgroup, newMsg); - return; - } -#endif + mnodeSendDropAllDbVgroupsMsg(pMsg->pDb); mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name); return mnodeDropDb(pMsg); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index d28d0b5b307e47b68ad18c4c33abe8cf2d5e9d15..20616bfbcdae827f6bb2ca31273d349650f704f2 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -47,13 +47,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg); static void mnodeFreeShowObj(void *data); -static bool mnodeCheckShowObj(SShowObj *pShow); +static bool mnodeAccquireShowObj(SShowObj *pShow); static bool mnodeCheckShowFinished(SShowObj *pShow); -static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size); -static void mnodeCleanupShowObj(void *pShow, bool forceRemove); +static void *mnodePutShowObj(SShowObj *pShow, int32_t size); +static void mnodeReleaseShowObj(void *pShow, bool forceRemove); extern void *tsMnodeTmr; -static void *tsQhandleCache = NULL; +static void *tsMnodeShowCache = NULL; +static int32_t tsShowObjIndex = 0; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; @@ -64,14 +65,15 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsQhandleCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); + tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); return 0; } void mnodeCleanUpShow() { - if (tsQhandleCache != NULL) { - taosCacheCleanup(tsQhandleCache); - tsQhandleCache = NULL; + if (tsMnodeShowCache != NULL) { + mPrint("show cache is cleanup"); + taosCacheCleanup(tsMnodeShowCache); + tsMnodeShowCache = NULL; } } @@ -118,13 +120,12 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen); SShowObj *pShow = (SShowObj *) calloc(1, showObjSize); - pShow->signature = pShow; pShow->type = pShowMsg->type; pShow->payloadLen = htons(pShowMsg->payloadLen); strcpy(pShow->db, pShowMsg->db); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - pShow = mnodeSaveShowObj(pShow, showObjSize); + pShow = mnodePutShowObj(pShow, showObjSize); if (pShow == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -132,21 +133,22 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SCMShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { - mnodeFreeShowObj(pShow); + mnodeReleaseShowObj(pShow, true); return TSDB_CODE_SERV_OUT_OF_MEMORY; } pShowRsp->qhandle = htobe64((uint64_t) pShow); - mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); + mTrace("%p, show type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle); if (code == 0) { pMsg->rpcRsp.rsp = pShowRsp; pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; + mnodeReleaseShowObj(pShow, false); return TSDB_CODE_SUCCESS; } else { - mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); + mError("%p, show type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); rpcFreeCont(pShowRsp); - mnodeCleanupShowObj(pShow, true); + mnodeReleaseShowObj(pShow, true); return code; } } @@ -159,22 +161,20 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pRetrieve->qhandle = htobe64(pRetrieve->qhandle); SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); + mTrace("%p, show type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); /* * in case of server restart, apps may hold qhandle created by server before * restart, which is actually invalid, therefore, signature check is required. */ - if (!mnodeCheckShowObj(pShow)) { - mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow); + if (!mnodeAccquireShowObj(pShow)) { + mError("%p, show is invalid", pShow); return TSDB_CODE_INVALID_QHANDLE; } if (mnodeCheckShowFinished(pShow)) { - mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows); + mTrace("%p, show is already read finished, numOfReads:%d numOfRows:%d", pShow, pShow->numOfReads, pShow->numOfRows); pShow->numOfReads = pShow->numOfRows; - //mnodeCleanupShowObj(pShow, true); - //return TSDB_CODE_SUCCESS; } if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -200,7 +200,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { if (rowsRead < 0) { rpcFreeCont(pRsp); - mnodeCleanupShowObj(pShow, false); + mnodeReleaseShowObj(pShow, false); assert(false); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -211,12 +211,13 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = size; - if (rowsToRead == 0) { - mnodeCleanupShowObj(pShow, true); + if (rowsToRead == 0 || rowsRead == rowsToRead) { + pRsp->completed = 1; + mnodeReleaseShowObj(pShow, true); } else { - mnodeCleanupShowObj(pShow, false); + mnodeReleaseShowObj(pShow, false); } - + return TSDB_CODE_SUCCESS; } @@ -318,24 +319,29 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) { return false; } -static bool mnodeCheckShowObj(SShowObj *pShow) { - SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow); +static bool mnodeAccquireShowObj(SShowObj *pShow) { + char key[10]; + sprintf(key, "%d", pShow->index); + + SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key); if (pSaved == pShow) { + mTrace("%p, show is accquired from cache", pShow); return true; } else { - mTrace("show:%p, is already released", pShow); return false; } } -static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { - if (tsQhandleCache != NULL) { - char key[24]; - sprintf(key, "show:%p", pShow); - SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60); +static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { + if (tsMnodeShowCache != NULL) { + char key[10]; + pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); + sprintf(key, "%d", pShow->index); + + SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 60); free(pShow); - mTrace("show:%p, is saved", newQhandle); + mTrace("%p, show is put into cache", newQhandle); return newQhandle; } @@ -345,10 +351,10 @@ static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { static void mnodeFreeShowObj(void *data) { SShowObj *pShow = data; sdbFreeIter(pShow->pIter); - mTrace("show:%p, is destroyed", pShow); + mTrace("%p, show is destroyed", pShow); } -static void mnodeCleanupShowObj(void *pShow, bool forceRemove) { - mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false"); - taosCacheRelease(tsQhandleCache, &pShow, forceRemove); +static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { + mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); + taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index b2d7ec768f6dd229b7461d7bd449f1e3123213e5..84a1f659d58d6994940de5b568932403385f004c 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -148,37 +148,30 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) { return TSDB_CODE_INVALID_VGROUP_ID; } - SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_INVALID_VGROUP_ID; - } - mnodeDecVgroupRef(pVgroup); - - SDbObj *pDb = mnodeGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("ctable:%s, vgId:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); - return TSDB_CODE_INVALID_DB; - } - mnodeDecDbRef(pDb); - - SAcctObj *pAcct = mnodeGetAcct(pDb->acct); - if (pAcct == NULL) { - mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct); - return TSDB_CODE_INVALID_ACCT; - } - mnodeDecAcctRef(pAcct); + SVgObj *pVgroup = NULL; + SDbObj *pDb = NULL; + SAcctObj *pAcct = NULL; + + pVgroup = mnodeGetVgroup(pTable->vgId); + if (pVgroup != NULL) pDb = mnodeGetDb(pVgroup->dbName); + if (pDb != NULL) pAcct = mnodeGetAcct(pDb->acct); if (pTable->info.type == TSDB_CHILD_TABLE) { grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); - pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); + if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); mnodeRemoveTableFromStable(pTable->superTable, pTable); mnodeDecTableRef(pTable->superTable); } else { grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); + if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); } - mnodeRemoveTableFromDb(pDb); - mnodeRemoveTableFromVgroup(pVgroup, pTable); + + if (pDb != NULL) mnodeRemoveTableFromDb(pDb); + if (pVgroup != NULL) mnodeRemoveTableFromVgroup(pVgroup, pTable); + + mnodeDecVgroupRef(pVgroup); + mnodeDecDbRef(pDb); + mnodeDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; } @@ -693,10 +686,10 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { } if (pCreate->numOfTags != 0) { - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); + mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateSuperTableMsg(pMsg); } else { - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); + mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateChildTableMsg(pMsg); } } @@ -721,7 +714,7 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } else { mError("table:%s, failed to drop table, table not exist", pDrop->tableId); - return TSDB_CODE_INVALID_TABLE; + return TSDB_CODE_INVALID_TABLE_ID; } } @@ -749,7 +742,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { if (pMsg->pTable == NULL) { if (!pInfo->createFlag) { mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); - return TSDB_CODE_INVALID_TABLE; + return TSDB_CODE_INVALID_TABLE_ID; } else { mTrace("table:%s, failed to get table meta, start auto create table ", pInfo->tableId); return mnodeAutoCreateChildTable(pMsg); @@ -786,7 +779,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { if (pStable->schema == NULL) { free(pStable); mError("table:%s, failed to create, no schema input", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; + return TSDB_CODE_INVALID_TABLE_ID; } memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -1276,9 +1269,9 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); - if (pTable->vgHash != NULL) { + if (pTable != NULL && pTable->vgHash != NULL) { contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo)); - } + } mnodeDecTableRef(pTable); } @@ -1287,12 +1280,23 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } - pRsp->numOfTables = htonl(numOfTable); - char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg); + pRsp->numOfTables = 0; + char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); + if (pTable == NULL) { + mError("stable:%s, not exist while get stable vgroup info", stableName); + mnodeDecTableRef(pTable); + continue; + } + if (pTable->vgHash == NULL) { + mError("stable:%s, not vgroup exist while get stable vgroup info", stableName); + mnodeDecTableRef(pTable); + continue; + } + SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg; SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash); @@ -1318,17 +1322,25 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { } taosHashDestroyIter(pIter); + mnodeDecTableRef(pTable); pVgroupInfo->numOfVgroups = htonl(vgSize); // one table is done, try the next table msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo); + pRsp->numOfTables++; } - pMsg->rpcRsp.rsp = pRsp; - pMsg->rpcRsp.len = msg - (char *)pRsp; + if (pRsp->numOfTables != numOfTable) { + rpcFreeCont(pRsp); + return TSDB_CODE_INVALID_TABLE_ID; + } else { + pRsp->numOfTables = htonl(pRsp->numOfTables); + pMsg->rpcRsp.rsp = pRsp; + pMsg->rpcRsp.len = msg - (char *)pRsp; - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; + } } static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) { @@ -1429,8 +1441,8 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb SSuperTableObj *pSuperTable = mnodeGetSuperTable(pTagData->name); if (pSuperTable == NULL) { mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name); - free(pTable); - terrno = TSDB_CODE_INVALID_TABLE; + mnodeDestroyChildTable(pTable); + terrno = TSDB_CODE_INVALID_TABLE_ID; return NULL; } mnodeDecTableRef(pSuperTable); @@ -1738,7 +1750,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; - STagData* pTag = (STagData*)pInfo->tags; + STagData *pTag = (STagData *)pInfo->tags; int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen); SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); @@ -1754,14 +1766,13 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { pCreateMsg->contLen = htonl(contLen); memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); + mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, ((STagData *)(pCreateMsg->schema))->name); rpcFreeCont(pMsg->rpcMsg.pCont); pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; pMsg->rpcMsg.pCont = pCreateMsg; pMsg->rpcMsg.contLen = contLen; - - mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, pInfo->tags); - + return TSDB_CODE_ACTION_NEED_REPROCESSED; } @@ -1888,7 +1899,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { SChildTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid); if (pTable == NULL) { mError("dnode:%d, vgId:%d sid:%d, table not found", pCfg->dnodeId, pCfg->vgId, pCfg->sid); - return TSDB_CODE_NOT_ACTIVE_TABLE; + return TSDB_CODE_INVALID_TABLE_ID; } SMDCreateTableMsg *pCreate = NULL; @@ -2199,7 +2210,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableId); if (pMsg->pTable == NULL) { mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId); - return TSDB_CODE_INVALID_TABLE; + return TSDB_CODE_INVALID_TABLE_ID; } pAlter->type = htons(pAlter->type); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index c8ff6566b6b537f576833ed74097e91b2a5746a0..9c24f7eaf3e3e6d0d88e2a27fe4f9b72db1956f2 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -716,14 +716,14 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { SDnodeObj *pDnode = mnodeGetDnode(pCfg->dnodeId); if (pDnode == NULL) { mTrace("dnode:%s, invalid dnode", taosIpStr(pCfg->dnodeId), pCfg->vgId); - return TSDB_CODE_NOT_ACTIVE_VNODE; + return TSDB_CODE_INVALID_VGROUP_ID; } mnodeDecDnodeRef(pDnode); SVgObj *pVgroup = mnodeGetVgroup(pCfg->vgId); if (pVgroup == NULL) { mTrace("dnode:%s, vgId:%d, no vgroup info", taosIpStr(pCfg->dnodeId), pCfg->vgId); - return TSDB_CODE_NOT_ACTIVE_VNODE; + return TSDB_CODE_INVALID_VGROUP_ID; } mnodeDecVgroupRef(pVgroup); @@ -784,7 +784,7 @@ void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb) { mPrint("db:%s, all vgroups is updated in sdb", pAlterDb->name); } -void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { +void mnodeDropAllDbVgroups(SDbObj *pDropDb) { void * pIter = NULL; int32_t numOfVgroups = 0; SVgObj *pVgroup = NULL; @@ -802,10 +802,6 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { }; sdbDeleteRow(&oper); numOfVgroups++; - - if (sendMsg) { - mnodeSendDropVgroupMsg(pVgroup, NULL); - } } mnodeDecVgroupRef(pVgroup); @@ -815,3 +811,25 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups); } + +void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) { + void * pIter = NULL; + int32_t numOfVgroups = 0; + SVgObj *pVgroup = NULL; + + mPrint("db:%s, all vgroups will be dropped in dnode", pDropDb->name); + while (1) { + pIter = mnodeGetNextVgroup(pIter, &pVgroup); + if (pVgroup == NULL) break; + + if (pVgroup->pDb == pDropDb) { + mnodeSendDropVgroupMsg(pVgroup, NULL); + } + + mnodeDecVgroupRef(pVgroup); + } + + sdbFreeIter(pIter); + + mPrint("db:%s, all vgroups:%d drop msg is sent to dnode", pDropDb->name, numOfVgroups); +} \ No newline at end of file diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index cbca19b57f0b2875ec08688ba9bcfe09a36c9116..74e3c409a1cc6932c7f3929d73f7da6469176e03 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -445,10 +445,10 @@ void httpJsonPairStatus(JsonBuf* buf, int code) { httpJsonItemToken(buf); if (code == TSDB_CODE_DB_NOT_SELECTED) { httpJsonPair(buf, "desc", 4, "failed to create database", 23); - } else if (code == TSDB_CODE_INVALID_TABLE) { + } else if (code == TSDB_CODE_INVALID_TABLE_ID) { httpJsonPair(buf, "desc", 4, "failed to create table", 22); } else httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code))); } } -} \ No newline at end of file +} diff --git a/src/plugins/http/src/tgJson.c b/src/plugins/http/src/tgJson.c index cb9c42e7920c57f276c52d013870e0623d77ee43..27059010b8713340de639feba6e0fb05c2554afb 100644 --- a/src/plugins/http/src/tgJson.c +++ b/src/plugins/http/src/tgJson.c @@ -111,7 +111,7 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { pContext->ipstr); return false; } - } else if (code == TSDB_CODE_INVALID_TABLE) { + } else if (code == TSDB_CODE_INVALID_TABLE_ID) { cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; if (multiCmds->cmds[multiCmds->pos - 1].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { multiCmds->pos = (int16_t)(multiCmds->pos - 2); @@ -151,4 +151,4 @@ void tgSetNextCmd(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { } else { multiCmds->pos++; } -} \ No newline at end of file +} diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 297ff31ed986f055a6eab51244e80683bdea29cd..fa46c3a0f35aaa8076815bf6f7de6d8f802f590b 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -420,13 +420,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; - rpcUnlockConn(pConn); - taosTmrStopA(&pConn->pTimer); // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured + rpcUnlockConn(pConn); + return; } @@ -1095,10 +1095,10 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->reqMsgLen = msgLen; pConn->pContext = pContext; - rpcUnlockConn(pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcSendMsgToPeer(pConn, msg, msgLen); + + rpcUnlockConn(pConn); } static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 03407a5aa217306c5480a50f30fe09f0aceec538..c41171a36e1fe4292ba059495dc01a39c1bc5782 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -43,7 +43,7 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, return TSDB_CODE_MSG_NOT_PROCESSED; if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) - return TSDB_CODE_NOT_ACTIVE_VNODE; + return TSDB_CODE_INVALID_VGROUP_ID; return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 6d65d10335ee79530c7ed0ae8648a617ad927117..09cb2d3fac89f634277ed9133c7723cf1f29cccc 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -53,7 +53,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) - return TSDB_CODE_NOT_ACTIVE_VNODE; + return TSDB_CODE_INVALID_VGROUP_ID; // it may be in deleting or closing state if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) return TSDB_CODE_NOT_READY; diff --git a/tests/perftest-scripts/perftest-csv2png.gnuplot b/tests/perftest-scripts/perftest-csv2png.gnuplot new file mode 100644 index 0000000000000000000000000000000000000000..9c34ebe403fe7317c963ab8693069025acc79a64 --- /dev/null +++ b/tests/perftest-scripts/perftest-csv2png.gnuplot @@ -0,0 +1,33 @@ +#!/user/bin/gnuplot +reset +set terminal png + +set title "Performance Test Report" font ",20" + +set ylabel "Time in Seconds" + +set xdata time +set timefmt "%Y%m%d" +set format x "%Y-%m-%d" +set xlabel "Date" + +set style data linespoints + +set terminal pngcairo size 1024,768 enhanced font 'Segoe UI, 10' +set output filename . '.png' +set datafile separator ',' + +set key reverse Left outside +set grid + +# plot 'perftest-influx-report.csv' using 1:2 title "InfluxDB Write", \ +# "" using 1:3 title "InfluxDB Query case1", \ +# "" using 1:4 title "InfluxDB Query case2", \ +# "" using 1:5 title "InfluxDB Query case3", \ +# "" using 1:6 title "InfluxDB Query case4" +# +plot filename . '.csv' using 1:2 title "TDengine Write", \ + "" using 1:3 title "TDengine Query case1", \ + "" using 1:4 title "TDengine Query case2", \ + "" using 1:5 title "TDengine Query case3", \ + "" using 1:6 title "TDengine Query case4" diff --git a/tests/perftest-scripts/perftest-daily.sh b/tests/perftest-scripts/perftest-daily.sh new file mode 100755 index 0000000000000000000000000000000000000000..894d9c7905170fdab1d6a7392f2170cf72e39fa3 --- /dev/null +++ b/tests/perftest-scripts/perftest-daily.sh @@ -0,0 +1,95 @@ +#!/bin/bash + +# Coloured Echoes # +function red_echo { echo -e "\033[31m$@\033[0m"; } # +function green_echo { echo -e "\033[32m$@\033[0m"; } # +function yellow_echo { echo -e "\033[33m$@\033[0m"; } # +function white_echo { echo -e "\033[1;37m$@\033[0m"; } # +# Coloured Printfs # +function red_printf { printf "\033[31m$@\033[0m"; } # +function green_printf { printf "\033[32m$@\033[0m"; } # +function yellow_printf { printf "\033[33m$@\033[0m"; } # +function white_printf { printf "\033[1;37m$@\033[0m"; } # +# Debugging Outputs # +function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; } # +function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; } # +function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } # +function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } # + +function set-Wal { + echo "/etc/taos/taos.cfg walLevel will be set to $1" + sed -i 's/^walLevel.*$/walLevel '"$1"'/g' /etc/taos/taos.cfg +} + +function collectSysInfo { + rm sysinfo.log + grep model /proc/cpuinfo | tail -n1 | tee sysinfo.log + grep cores /proc/cpuinfo | tail -n1 | tee -a sysinfo.log + grep MemTotal /proc/meminfo | tee -a sysinfo.log + grep "^[^#;]" /etc/taos/taos.cfg | tee taos.cfg +} + +function buildTDengine { + cd /root/TDengine + git pull + cd debug + rm -rf * + cmake .. + make > /dev/null + make install +} + +function restartTaosd { + systemctl stop taosd + pkill -KILL -x taosd + sleep 10 + + rm -rf /mnt/var/log/taos/* + rm -rf /mnt/var/lib/taos/* + + taosd 2>&1 > /dev/null & + sleep 10 +} + +function sendReport { + receiver="sdsang@taosdata.com, sangshuduo@gmail.com" + mimebody="MIME-Version: 1.0\nContent-Type: text/html; charset=utf-8\n" + + echo -e "to: ${receiver}\nsubject: Perf test report ${today}\n" | \ + (cat - && uuencode perftest-1d-$today.log perftest-1d-$today.log)| \ + (cat - && uuencode perftest-1d-report.csv perftest-1d-report-$today.csv) | \ + (cat - && uuencode perftest-1d-report.png perftest-1d-report-$today.png) | \ + (cat - && uuencode perftest-13d-$today.log perftest-13d-$today.log)| \ + (cat - && uuencode perftest-13d-report.csv perftest-13d-report-$today.csv) | \ + (cat - && uuencode perftest-13d-report.png perftest-13d-report-$today.png) | \ + (cat - && uuencode taosdemo-$today.log taosdemo-$today.log) | \ + (cat - && uuencode taosdemo-report.csv taosdemo-report-$today.csv) | \ + (cat - && uuencode taosdemo-report.png taosdemo-report-$today.png) | \ + (cat - && uuencode sysinfo.log sysinfo.txt) | \ + (cat - && uuencode taos.cfg taos-cfg-$today.txt) | \ + ssmtp "${receiver}" +} + +today=`date +"%Y%m%d"` +cd /root +echo -e "cron-ran-at-${today}" >> cron.log + +echoInfo "Build TDengine" +buildTDengine + +set-Wal "2" + +cd /root +./perftest-tsdb-compare-1d.sh + +cd /root +./perftest-tsdb-compare-13d.sh + +cd /root +./perftest-taosdemo.sh + +collectSysInfo + +echoInfo "Send Report" +sendReport +echoInfo "End of Test" diff --git a/tests/perftest-scripts/perftest-taosdemo.sh b/tests/perftest-scripts/perftest-taosdemo.sh new file mode 100755 index 0000000000000000000000000000000000000000..511ec22fec47a496b5e79b35c8d0b42d61d0a336 --- /dev/null +++ b/tests/perftest-scripts/perftest-taosdemo.sh @@ -0,0 +1,70 @@ +#!/bin/bash + +# Coloured Echoes # +function red_echo { echo -e "\033[31m$@\033[0m"; } # +function green_echo { echo -e "\033[32m$@\033[0m"; } # +function yellow_echo { echo -e "\033[33m$@\033[0m"; } # +function white_echo { echo -e "\033[1;37m$@\033[0m"; } # +# Coloured Printfs # +function red_printf { printf "\033[31m$@\033[0m"; } # +function green_printf { printf "\033[32m$@\033[0m"; } # +function yellow_printf { printf "\033[33m$@\033[0m"; } # +function white_printf { printf "\033[1;37m$@\033[0m"; } # +# Debugging Outputs # +function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; } # +function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; } # +function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } # +function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } # + +function restartTaosd { + systemctl stop taosd + pkill -KILL -x taosd + sleep 10 + + rm -rf /mnt/var/log/taos/* + rm -rf /mnt/var/lib/taos/* + + taosd 2>&1 > /dev/null & + sleep 10 +} + +function runCreateTableOnly { + echoInfo "Restart Taosd" + restartTaosd + + /usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo -n 0 2>&1 | tee taosdemo-$today.log" + demoTableOnly=`grep "Total:" totaltime.out|awk '{print $2}'` +} + +function runCreateTableThenInsert { + echoInfo "Restart Taosd" + restartTaosd + + /usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo 2>&1 | tee -a taosdemo-$today.log" + demoTableAndInsert=`grep "Total:" totaltime.out|awk '{print $2}'` + demoRPS=`grep "records\/second" taosdemo-$today.log | tail -n1 | awk '{print $13}'` +} + +function generateTaosdemoPlot { + echo "${today}, demoTableOnly: ${demoTableOnly}, demoTableAndInsert: ${demoTableAndInsert}" | tee -a taosdemo-$today.log + echo "${today}, ${demoTableOnly}, ${demoTableAndInsert}, ${demoRPS}" >> taosdemo-report.csv + + csvLines=`cat taosdemo-report.csv | wc -l` + + if [ "$csvLines" -gt "10" ]; then + sed -i '1d' taosdemo-report.csv + fi + + gnuplot -p taosdemo-csv2png.gnuplot +} + +today=`date +"%Y%m%d"` + +cd /root +echoInfo "Test Create Table Only " +runCreateTableOnly +echoInfo "Test Create Table then Insert data" +runCreateTableThenInsert +echoInfo "Generate plot for taosdemo" +generateTaosdemoPlot +echoInfo "End of TaosDemo Test" diff --git a/tests/perftest-scripts/perftest-tsdb-compare-13d.sh b/tests/perftest-scripts/perftest-tsdb-compare-13d.sh new file mode 100755 index 0000000000000000000000000000000000000000..4b3ed6818c433c96fdde2513a3e38bb3110dfdbf --- /dev/null +++ b/tests/perftest-scripts/perftest-tsdb-compare-13d.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +# Coloured Echoes # +function red_echo { echo -e "\033[31m$@\033[0m"; } # +function green_echo { echo -e "\033[32m$@\033[0m"; } # +function yellow_echo { echo -e "\033[33m$@\033[0m"; } # +function white_echo { echo -e "\033[1;37m$@\033[0m"; } # +# Coloured Printfs # +function red_printf { printf "\033[31m$@\033[0m"; } # +function green_printf { printf "\033[32m$@\033[0m"; } # +function yellow_printf { printf "\033[33m$@\033[0m"; } # +function white_printf { printf "\033[1;37m$@\033[0m"; } # +# Debugging Outputs # +function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; } # +function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; } # +function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } # +function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } # + +function restartTaosd { + systemctl stop taosd + pkill -KILL -x taosd + sleep 10 + + rm -rf /mnt/var/log/taos/* + rm -rf /mnt/var/lib/taos/* + + taosd 2>&1 > /dev/null & + sleep 10 +} + +function runPerfTest13d { + echoInfo "Restart Taosd" + restartTaosd + + cd /home/taos/tliu/timeseriesdatabase-comparisons/build/tsdbcompare + ./runreal-13d-csv.sh 2>&1 | tee /root/perftest-13d-$today.log +} + +function generatePerfPlot13d { + cd /root + + csvLines=`cat perftest-13d-report.csv | wc -l` + + if [ "$csvLines" -gt "10" ]; then + sed -i '1d' perftest-13d-report.csv + fi + + gnuplot -e "filename='perftest-13d-report'" -p perftest-csv2png.gnuplot +} + +today=`date +"%Y%m%d"` +cd /root + +echoInfo "run Performance Test with 13 days data" +runPerfTest13d +echoInfo "Generate plot of 13 days data" +generatePerfPlot13d +echoInfo "End of TSDB-Compare 13-days-data Test" diff --git a/tests/perftest-scripts/perftest-tsdb-compare-1d.sh b/tests/perftest-scripts/perftest-tsdb-compare-1d.sh new file mode 100755 index 0000000000000000000000000000000000000000..ebe34cde72ce3d6af4ff03ac242c73efecb53475 --- /dev/null +++ b/tests/perftest-scripts/perftest-tsdb-compare-1d.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +# Coloured Echoes # +function red_echo { echo -e "\033[31m$@\033[0m"; } # +function green_echo { echo -e "\033[32m$@\033[0m"; } # +function yellow_echo { echo -e "\033[33m$@\033[0m"; } # +function white_echo { echo -e "\033[1;37m$@\033[0m"; } # +# Coloured Printfs # +function red_printf { printf "\033[31m$@\033[0m"; } # +function green_printf { printf "\033[32m$@\033[0m"; } # +function yellow_printf { printf "\033[33m$@\033[0m"; } # +function white_printf { printf "\033[1;37m$@\033[0m"; } # +# Debugging Outputs # +function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; } # +function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; } # +function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; } # +function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; } # + +function restartTaosd { + systemctl stop taosd + pkill -KILL -x taosd + sleep 10 + + rm -rf /mnt/var/log/taos/* + rm -rf /mnt/var/lib/taos/* + + taosd 2>&1 > /dev/null & + sleep 10 +} + +function runPerfTest1d { + echoInfo "Restart Taosd" + restartTaosd + + cd /home/taos/tliu/timeseriesdatabase-comparisons/build/tsdbcompare + ./runreal-1d-csv.sh 2>&1 | tee /root/perftest-1d-$today.log +} + +function generatePerfPlot1d { + cd /root + + csvLines=`cat perftest-1d-report.csv | wc -l` + + if [ "$csvLines" -gt "10" ]; then + sed -i '2d' perftest-1d-report.csv + fi + + gnuplot -e "filename='perftest-1d-report'" -p perftest-csv2png.gnuplot +} + +today=`date +"%Y%m%d"` +cd /root + +echoInfo "run Performance Test with 1 day data" +runPerfTest1d +echoInfo "Generate plot of 1 day data" +generatePerfPlot1d +echoInfo "End of TSDB-Compare 1-day-data Test" diff --git a/tests/perftest-scripts/run-csv.sh b/tests/perftest-scripts/run-csv.sh new file mode 100755 index 0000000000000000000000000000000000000000..afa1d5df5de0c9780d65ff12ae680a323851444f --- /dev/null +++ b/tests/perftest-scripts/run-csv.sh @@ -0,0 +1,209 @@ +#!/bin/bash + + +# Color setting +RED='\033[0;31m' +GREEN='\033[1;32m' +GREEN_DARK='\033[0;32m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' +docker rm -f `docker ps -a -q` +#set -x +echo +echo "---------------Generating Data-----------------" +echo +echo "Prepare data for InfluxDB...." +#bin/bulk_data_gen -seed 123 -format influx-bulk -scale-var 100 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" >data/influx.dat +bin/bulk_data_gen -seed 123 -format influx-bulk -sampling-interval 1s -scale-var 10 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" >data/influx.dat + +echo +echo "Prepare data for TDengine...." +#bin/bulk_data_gen -seed 123 -format tdengine -tdschema-file config/TDengineSchema.toml -scale-var 100 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" > data/tdengine.dat +bin/bulk_data_gen -seed 123 -format tdengine -sampling-interval 1s -tdschema-file config/TDengineSchema.toml -scale-var 10 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" > data/tdengine.dat + + + +docker network create --ip-range 172.15.1.255/24 --subnet 172.15.1.1/16 tsdbcomp >>/dev/null 2>&1 + + +TDENGINE=`docker run -d --net tsdbcomp --ip 172.15.1.6 -p 6030:6030 -p 6020:6020 -p 6031:6031 -p 6032:6032 -p 6033:6033 -p 6034:6034 -p 6035:6035 -p 6036:6036 -p 6037:6037 -p 6038:6038 -p 6039:6039 tdengine/tdengine:1.6.4.5` +echo +echo "------------------Writing Data-----------------" +echo +sleep 5 +echo +echo -e "Start test TDengine, result in ${GREEN}Green line${NC}" + +TDENGINERES=`cat data/tdengine.dat |bin/bulk_load_tdengine --url 172.15.1.6:0 --batch-size 300 -do-load -report-tags n1 -workers 20 -fileout=false| grep loaded` +#TDENGINERES=`cat data/tdengine.dat |gunzip|bin/bulk_load_tdengine --url 172.15.1.6:0 --batch-size 300 -do-load -report-tags n1 -workers 10 -fileout=false| grep loaded` +echo +echo -e "${GREEN}TDengine writing result:${NC}" +echo -e "${GREEN}$TDENGINERES${NC}" +DATA=`echo $TDENGINERES|awk '{print($2)}'` +TMP=`echo $TDENGINERES|awk '{print($5)}'` +TDWTM=`echo ${TMP%s*}` + + +INFLUX=`docker run -d -p 8086:8086 --net tsdbcomp --ip 172.15.1.5 influxdb` >>/dev/null 2>&1 +sleep 10 +echo +echo -e "Start test InfluxDB, result in ${GREEN}Green line${NC}" + + +INFLUXRES=`cat data/influx.dat |bin/bulk_load_influx --batch-size=5000 --workers=20 --urls="http://172.15.1.5:8086" | grep loaded` + + +echo +echo -e "${GREEN}InfluxDB writing result:${NC}" +echo -e "${GREEN}$INFLUXRES${NC}" + +TMP=`echo $INFLUXRES|awk '{print($5)}'` +IFWTM=`echo ${TMP%s*}` + +echo +echo "------------------Querying Data-----------------" +echo + +sleep 10 +echo +echo "start query test, query max from 8 hosts group by 1 hour, TDengine" +echo + +#Test case 1 +#测试用例1,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') ; +# a,b,c,d,e,f,g,h are random 8 numbers. +TDQS1=`bin/bulk_query_gen -seed 123 -format tdengine -query-type 8-host-all -scale-var 10 -queries 1000 | bin/query_benchmarker_tdengine -urls="http://172.15.1.6:6020" -workers 50 -print-interval 0|grep wall` +echo +echo -e "${GREEN}TDengine query test case 1 result:${NC}" +echo -e "${GREEN}$TDQS1${NC}" +TMP=`echo $TDQS1|awk '{print($4)}'` +TDQ1=`echo ${TMP%s*}` + +#Test case 2 +#测试用例2,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以1小时为粒度,查询每1小时的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') interval(1h); +# a,b,c,d,e,f,g,h are random 8 numbers +TDQS2=`bin/bulk_query_gen -seed 123 -format tdengine -query-type 8-host-allbyhr -scale-var 10 -queries 1000 | bin/query_benchmarker_tdengine -urls="http://172.15.1.6:6020" -workers 50 -print-interval 0|grep wall` + +echo +echo -e "${GREEN}TDengine query test case 2 result:${NC}" +echo -e "${GREEN}$TDQS2${NC}" +TMP=`echo $TDQS2|awk '{print($4)}'` +TDQ2=`echo ${TMP%s*}` + +#Test case 3 +#测试用例3,测试用例3,随机查询12个小时的数据,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以10分钟为粒度,查询每10分钟的最大值 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') and time >x and time x and time x and time x and time >/dev/null 2>&1 +docker container rm -f $INFLUX >>/dev/null 2>&1 +docker stop $TDENGINE >>/dev/null 2>&1 +docker container rm -f $TDENGINE >>/dev/null 2>&1 +docker network rm tsdbcomp >>/dev/null 2>&1 +#bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086" +#bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020" + + +today=`date +"%Y%m%d"` +echo "${today}, ${IFWTM}, ${IFQ1}, ${IFQ2}, ${IFQ3}, ${IFQ4}" >> /root/perftest-influx-report.csv + diff --git a/tests/perftest-scripts/runInfluxdb-13d-csv.sh b/tests/perftest-scripts/runInfluxdb-13d-csv.sh new file mode 100755 index 0000000000000000000000000000000000000000..2eb36068688e8e180165c487bdeacbba9a9e7b73 --- /dev/null +++ b/tests/perftest-scripts/runInfluxdb-13d-csv.sh @@ -0,0 +1,126 @@ +#!/bin/bash + + +# Color setting +RED='\033[0;31m' +GREEN='\033[1;32m' +GREEN_DARK='\033[0;32m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' +docker rm -f `docker ps -a -q` +#set -x +echo +echo "---------------Generating Data-----------------" +echo +echo "Prepare data for InfluxDB...." +#bin/bulk_data_gen -seed 123 -format influx-bulk -scale-var 100 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" >data/influx.dat +bin/bulk_data_gen -seed 123 -format influx-bulk -sampling-interval 1s -scale-var 10 -use-case devops -timestamp-start "2018-01-02T00:00:00Z" -timestamp-end "2018-01-15T00:00:00Z" > /mnt/data/influx.dat + +docker network create --ip-range 172.15.1.255/24 --subnet 172.15.1.1/16 tsdbcomp >>/dev/null 2>&1 + +INFLUX=`docker run -d -p 8086:8086 --net tsdbcomp --ip 172.15.1.5 influxdb` >>/dev/null 2>&1 +sleep 10 +echo +echo -e "Start test InfluxDB, result in ${GREEN}Green line${NC}" + + +INFLUXRES=`cat /mnt/data/influx.dat |bin/bulk_load_influx --batch-size=5000 --workers=20 --urls="http://172.15.1.5:8086" | grep loaded` + + +echo +echo -e "${GREEN}InfluxDB writing result:${NC}" +echo -e "${GREEN}$INFLUXRES${NC}" +DATA=`echo $INFLUXRES|awk '{print($2)}'` +TMP=`echo $INFLUXRES|awk '{print($5)}'` +IFWTM=`echo ${TMP%s*}` + +echo +echo "------------------Querying Data-----------------" +echo + +sleep 10 + +echo +echo "start query test, query max from 8 hosts group by 1hour, Influxdb" +echo +#Test case 1 +#测试用例1,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') ; +# a,b,c,d,e,f,g,h are random 8 numbers. +IFQS1=`bin/bulk_query_gen -seed 123 -format influx-http -query-type 8-host-all -scale-var 10 -queries 1000 | bin/query_benchmarker_influxdb -urls="http://172.15.1.5:8086" -workers 50 -print-interval 0|grep wall` +echo -e "${GREEN}InfluxDB query test case 1 result:${NC}" +echo -e "${GREEN}$IFQS1${NC}" +TMP=`echo $IFQS1|awk '{print($4)}'` +IFQ1=`echo ${TMP%s*}` +#Test case 2 +#测试用例2,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以1小时为粒度,查询每1小时的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') interval(1h); +# a,b,c,d,e,f,g,h are random 8 numbers +IFQS2=`bin/bulk_query_gen -seed 123 -format influx-http -query-type 8-host-allbyhr -scale-var 10 -queries 1000 | bin/query_benchmarker_influxdb -urls="http://172.15.1.5:8086" -workers 50 -print-interval 0|grep wall` +echo -e "${GREEN}InfluxDB query test case 2 result:${NC}" +echo -e "${GREEN}$IFQS2${NC}" +TMP=`echo $IFQS2|awk '{print($4)}'` +IFQ2=`echo ${TMP%s*}` +#Test case 3 +#测试用例3,测试用例3,随机查询12个小时的数据,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以10分钟为粒度,查询每10分钟的最大值 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') and time >x and time x and time > /root/perftest-influxdb-report-13d.csv + +docker stop $INFLUX >>/dev/null 2>&1 +docker container rm -f $INFLUX >>/dev/null 2>&1 +docker network rm tsdbcomp >>/dev/null 2>&1 +#bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086" +#bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020" + diff --git a/tests/perftest-scripts/runreal-13d-csv.sh b/tests/perftest-scripts/runreal-13d-csv.sh new file mode 100755 index 0000000000000000000000000000000000000000..ff7ce41d4bd2f85d6911cf5f9d194a487fc4df07 --- /dev/null +++ b/tests/perftest-scripts/runreal-13d-csv.sh @@ -0,0 +1,149 @@ +#!/bin/bash + + +# Color setting +RED='\033[0;31m' +GREEN='\033[1;32m' +GREEN_DARK='\033[0;32m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' +#set -x +echo +echo "---------------Generating Data-----------------" +echo + +echo +echo "Prepare data for TDengine...." +#bin/bulk_data_gen -seed 123 -format tdengine -tdschema-file config/TDengineSchema.toml -scale-var 100 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" > data/tdengine.dat +bin/bulk_data_gen -seed 123 -format tdengine -sampling-interval 1s -tdschema-file config/TDengineSchema.toml -scale-var 10 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-14T00:00:00Z" > /mnt/data/tdengine.dat + + +echo +echo -e "Start test TDengine, result in ${GREEN}Green line${NC}" + +for i in {1..5}; do + TDENGINERES=`cat /mnt/data/tdengine.dat |bin/bulk_load_tdengine --url 127.0.0.1:0 --batch-size 5000 -do-load -report-tags n1 -workers 20 -fileout=false| grep loaded` +#TDENGINERES=`cat data/tdengine.dat |gunzip|bin/bulk_load_tdengine --url 127.0.0.1:0 --batch-size 300 -do-load -report-tags n1 -workers 10 -fileout=false| grep loaded` + echo + echo -e "${GREEN}TDengine writing result:${NC}" + echo -e "${GREEN}$TDENGINERES${NC}" + DATA=`echo $TDENGINERES|awk '{print($2)}'` + TMP=`echo $TDENGINERES|awk '{print($5)}'` + TDWTM=`echo ${TMP%s*}` + + [ -z "$TDWTM" ] || break +done + + + +echo +echo "------------------Querying Data-----------------" +echo + +sleep 10 +echo +echo "start query test, query max from 8 hosts group by 1 hour, TDengine" +echo + +#Test case 1 +#测试用例1,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') ; +# a,b,c,d,e,f,g,h are random 8 numbers. +for i in {1..5}; do + TDQS1=`bin/bulk_query_gen -seed 123 -format tdengine -query-type 8-host-all -scale-var 10 -queries 1000 | bin/query_benchmarker_tdengine -urls="http://127.0.0.1:6020" -workers 50 -print-interval 0|grep wall` + echo + echo -e "${GREEN}TDengine query test case 1 result:${NC}" + echo -e "${GREEN}$TDQS1${NC}" + TMP=`echo $TDQS1|awk '{print($4)}'` + TDQ1=`echo ${TMP%s*}` + + [ -z "$TDQ1" ] || break +done + +#Test case 2 +#测试用例2,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以1小时为粒度,查询每1小时的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') interval(1h); +# a,b,c,d,e,f,g,h are random 8 numbers +for i in {1..5}; do + TDQS2=`bin/bulk_query_gen -seed 123 -format tdengine -query-type 8-host-allbyhr -scale-var 10 -queries 1000 | bin/query_benchmarker_tdengine -urls="http://127.0.0.1:6020" -workers 50 -print-interval 0|grep wall` + + echo + echo -e "${GREEN}TDengine query test case 2 result:${NC}" + echo -e "${GREEN}$TDQS2${NC}" + TMP=`echo $TDQS2|awk '{print($4)}'` + TDQ2=`echo ${TMP%s*}` + + [ -z "$TDQ2" ] || break +done + +#Test case 3 +#测试用例3,测试用例3,随机查询12个小时的数据,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以10分钟为粒度,查询每10分钟的最大值 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') and time >x and time x and time > /root/perftest-13d-report.csv + +#bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086" +#bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020" diff --git a/tests/perftest-scripts/runreal-1d-csv.sh b/tests/perftest-scripts/runreal-1d-csv.sh new file mode 100755 index 0000000000000000000000000000000000000000..5cd113aadf1c05bb67a061b969b842872a163bca --- /dev/null +++ b/tests/perftest-scripts/runreal-1d-csv.sh @@ -0,0 +1,149 @@ +#!/bin/bash + + +# Color setting +RED='\033[0;31m' +GREEN='\033[1;32m' +GREEN_DARK='\033[0;32m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' +#set -x +echo +echo "---------------Generating Data-----------------" +echo + +echo +echo "Prepare data for TDengine...." +#bin/bulk_data_gen -seed 123 -format tdengine -tdschema-file config/TDengineSchema.toml -scale-var 100 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" > data/tdengine.dat +bin/bulk_data_gen -seed 123 -format tdengine -sampling-interval 1s -tdschema-file config/TDengineSchema.toml -scale-var 10 -use-case devops -timestamp-start "2018-01-01T00:00:00Z" -timestamp-end "2018-01-02T00:00:00Z" > data/tdengine.dat + + +echo +echo -e "Start test TDengine, result in ${GREEN}Green line${NC}" + +for i in {1..5}; do + TDENGINERES=`cat data/tdengine.dat |bin/bulk_load_tdengine --url 127.0.0.1:0 --batch-size 300 -do-load -report-tags n1 -workers 20 -fileout=false| grep loaded` +#TDENGINERES=`cat data/tdengine.dat |gunzip|bin/bulk_load_tdengine --url 127.0.0.1:0 --batch-size 300 -do-load -report-tags n1 -workers 10 -fileout=false| grep loaded` + echo + echo -e "${GREEN}TDengine writing result:${NC}" + echo -e "${GREEN}$TDENGINERES${NC}" + DATA=`echo $TDENGINERES|awk '{print($2)}'` + TMP=`echo $TDENGINERES|awk '{print($5)}'` + TDWTM=`echo ${TMP%s*}` + + [ -z "$TDWTM" ] || break +done + + + +echo +echo "------------------Querying Data-----------------" +echo + +sleep 10 +echo +echo "start query test, query max from 8 hosts group by 1 hour, TDengine" +echo + +#Test case 1 +#测试用例1,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') ; +# a,b,c,d,e,f,g,h are random 8 numbers. +for i in {1..5}; do + TDQS1=`bin/bulk_query_gen -seed 123 -format tdengine -query-type 8-host-all -scale-var 10 -queries 1000 | bin/query_benchmarker_tdengine -urls="http://127.0.0.1:6020" -workers 50 -print-interval 0|grep wall` + echo + echo -e "${GREEN}TDengine query test case 1 result:${NC}" + echo -e "${GREEN}$TDQS1${NC}" + TMP=`echo $TDQS1|awk '{print($4)}'` + TDQ1=`echo ${TMP%s*}` + + [ -z "$TDQ1" ] || break +done + +#Test case 2 +#测试用例2,查询所有数据中,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以1小时为粒度,查询每1小时的最大值。 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') interval(1h); +# a,b,c,d,e,f,g,h are random 8 numbers +for i in {1..5}; do + TDQS2=`bin/bulk_query_gen -seed 123 -format tdengine -query-type 8-host-allbyhr -scale-var 10 -queries 1000 | bin/query_benchmarker_tdengine -urls="http://127.0.0.1:6020" -workers 50 -print-interval 0|grep wall` + + echo + echo -e "${GREEN}TDengine query test case 2 result:${NC}" + echo -e "${GREEN}$TDQS2${NC}" + TMP=`echo $TDQS2|awk '{print($4)}'` + TDQ2=`echo ${TMP%s*}` + + [ -z "$TDQ2" ] || break +done + +#Test case 3 +#测试用例3,测试用例3,随机查询12个小时的数据,用8个hostname标签进行匹配,匹配出这8个hostname对应的模拟服务器CPU数据中的usage_user这个监控数据,以10分钟为粒度,查询每10分钟的最大值 +#select max(usage_user) from cpu where(hostname='host_a' and hostname='host_b'and hostname='host_c'and hostname='host_d'and hostname='host_e'and hostname='host_f' and hostname='host_g'and hostname='host_h') and time >x and time x and time > /root/perftest-1d-report.csv + +#bulk_query_gen/bulk_query_gen -format influx-http -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_influxdb/query_benchmarker_influxdb -urls="http://172.26.89.231:8086" +#bulk_query_gen/bulk_query_gen -format tdengine -query-type 1-host-1-hr -scale-var 10 -queries 1000 | query_benchmarker_tdengine/query_benchmarker_tdengine -urls="http://172.26.89.231:6020" diff --git a/tests/perftest-scripts/taosdemo-csv2png.gnuplot b/tests/perftest-scripts/taosdemo-csv2png.gnuplot new file mode 100644 index 0000000000000000000000000000000000000000..9fcd4bb3d99cd64d97b3e2aa2257e384e19fc3ab --- /dev/null +++ b/tests/perftest-scripts/taosdemo-csv2png.gnuplot @@ -0,0 +1,26 @@ +#!/user/bin/gnuplot +reset +set terminal png + +set title "TaosDemo Performance Report" font ",20" + +set ylabel "Time in Seconds" + +set xdata time +set timefmt "%Y%m%d" +set format x "%Y-%m-%d" +set xlabel "Date" + +set style data linespoints + +set terminal pngcairo size 1024,768 enhanced font 'Segoe UI, 10' +set output 'taosdemo-report.png' +set datafile separator ',' + +set key reverse Left outside +set grid + + +plot 'taosdemo-report.csv' using 1:2 title "Create 10,000 Table", \ + "" using 1:3 title "Create 10,000 Table and Insert 100,000 data", \ + "" using 1:4 title "Request Per Second of Insert 100,000 data" diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 44c978957e0ccf49805ded9e5c26ba0bfb3a656b..2e5d84d45f1cc7545c0af509087f930491d4bd0b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import traceback # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -24,11 +25,13 @@ import copy import threading import random +import time import logging import datetime import textwrap from typing import List +from typing import Dict from util.log import * from util.dnodes import * @@ -45,6 +48,14 @@ logger = None def runThread(wt: WorkerThread): wt.run() +class CrashGenError(Exception): + def __init__(self, msg=None, errno=None): + self.msg = msg + self.errno = errno + + def __str__(self): + return self.msg + class WorkerThread: def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, @@ -63,10 +74,10 @@ class WorkerThread: self._dbConn = DbConn() def logDebug(self, msg): - logger.info(" t[{}] {}".format(self._tid, msg)) + logger.debug(" TRD[{}] {}".format(self._tid, msg)) def logInfo(self, msg): - logger.info(" t[{}] {}".format(self._tid, msg)) + logger.info(" TRD[{}] {}".format(self._tid, msg)) def getTaskExecutor(self): @@ -95,15 +106,19 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - logger.debug("Thread task loop exited barrier...") + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - logger.debug("Thread task loop exited step gate...") + logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): + logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() + logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) + logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -123,7 +138,7 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # Wait again at the "gate", waiting to be "tapped" - # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) + logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid)) self._stepGate.wait() self._stepGate.clear() @@ -133,33 +148,40 @@ class WorkerThread: self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - logger.debug("Tapping worker thread {}".format(self._tid)) + logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) self._stepGate.set() # wake up! time.sleep(0) # let the released thread run a bit - def execSql(self, sql): # not "execute", since we are out side the DB context + def execSql(self, sql): # TODO: expose DbConn directly if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: return self._tc.getDbState().getDbConn().execute(sql) - def querySql(self, sql): # not "execute", since we are out side the DB context + def getDbConn(self): if ( gConfig.per_thread_db_connection ): - return self._dbConn.query(sql) + return self._dbConn else: - return self._tc.getDbState().getDbConn().query(sql) - + return self._tc.getDbState().getDbConn() + + # def querySql(self, sql): # not "execute", since we are out side the DB context + # if ( gConfig.per_thread_db_connection ): + # return self._dbConn.query(sql) + # else: + # return self._tc.getDbState().getDbConn().query(sql) + class ThreadCoordinator: - def __init__(self, pool, wd: WorkDispatcher, dbState): + def __init__(self, pool, dbState): self._curStep = -1 # first step is 0 self._pool = pool - self._wd = wd + # self._wd = wd self._te = None # prepare for every new step self._dbState = dbState self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads + self._execStats = ExecutionStats() def getTaskExecutor(self): return self._te @@ -176,41 +198,63 @@ class ThreadCoordinator: # Coordinate all threads step by step self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore - while(self._curStep < maxSteps): - print(".", end="", flush=True) - logger.debug("Main thread going to sleep") + self._execStats.startExec() # start the stop watch + failed = False + while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 + if not gConfig.debug: + print(".", end="", flush=True) # print this only if we are not in debug mode + logger.debug("[TRD] Main thread going to sleep") # Now ready to enter a step self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + try: + self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + except taos.error.ProgrammingError as err: + if ( err.msg == 'network unavailable' ): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + failed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at end, and maybe signal them to stop + else: + raise + finally: + pass + self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step - logger.info("<-- Step {} finished".format(self._curStep)) + logger.debug("<-- Step {} finished".format(self._curStep)) self._curStep += 1 # we are about to get into next step. TODO: race condition here! logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step - self._te = TaskExecutor(self._curStep) + if not failed: # only if not failed + self._te = TaskExecutor(self._curStep) - logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep + logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep self.tapAllThreads() logger.debug("Main thread ready to finish up...") - self.crossStepBarrier() # Cross it one last time, after all threads finish - self._stepBarrier.reset() - logger.debug("Main thread in exclusive zone...") - self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") - self.tapAllThreads() # Let the threads run one last time + if not failed: # only in regular situations + self.crossStepBarrier() # Cross it one last time, after all threads finish + self._stepBarrier.reset() + logger.debug("Main thread in exclusive zone...") + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish + logger.info("All worker thread finished") + self._execStats.endExec() - logger.info("All threads finished") - print("\r\nFinished") + def logStats(self): + self._execStats.logStats() def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -219,7 +263,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.info("Waking up threads: {}".format(str(wakeSeq))) + logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! @@ -233,11 +277,15 @@ class ThreadCoordinator: raise RuntimeError("Cannot fetch task when not running") # return self._wd.pickTask() # Alternatively, let's ask the DbState for the appropriate task - dbState = self.getDbState() - tasks = dbState.getTasksAtState() - i = Dice.throw(len(tasks)) - # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. - return tasks[i].clone() + # dbState = self.getDbState() + # tasks = dbState.getTasksAtState() # TODO: create every time? + # nTasks = len(tasks) + # i = Dice.throw(nTasks) + # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) + # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + # return tasks[i].clone() # TODO: still necessary? + taskType = self.getDbState().pickTaskType() # pick a task type for current state + return taskType(self.getDbState(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -253,7 +301,7 @@ class ThreadPool: self.maxSteps = maxSteps self.funcSequencer = funcSequencer # Internal class variables - self.dispatcher = WorkDispatcher(dbState) + # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads @@ -362,7 +410,7 @@ class DbConn: # Get the connection/cursor ready self._cursor.execute('reset query cache') - # self._cursor.execute('use db') + # self._cursor.execute('use db') # note we do this in _findCurrenState # Open connection self._tdSql = TDSql() @@ -390,28 +438,265 @@ class DbConn: def execute(self, sql): if ( not self.isOpen ): raise RuntimeError("Cannot execute database commands until connection is open") - return self._tdSql.execute(sql) + logger.debug("[SQL] Executing SQL: {}".format(sql)) + nRows = self._tdSql.execute(sql) + logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows - def query(self, sql) -> int : # return number of rows retrieved + def query(self, sql) : # return rows affected if ( not self.isOpen ): raise RuntimeError("Cannot query database until connection is open") - return self._tdSql.query(sql) + logger.debug("[SQL] Executing SQL: {}".format(sql)) + nRows = self._tdSql.query(sql) + logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows + # results are in: return self._tdSql.queryResult + def getQueryResult(self): + return self._tdSql.queryResult -# State of the database as we believe it to be -class DbState(): + def _queryAny(self, sql) : # actual query result as an int + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + tSql = self._tdSql + nRows = tSql.query(sql) + if nRows != 1 : + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + if tSql.queryRows != 1 or tSql.queryCols != 1: + raise RuntimeError("Unexpected result set for query: {}".format(sql)) + return tSql.queryResult[0][0] + + def queryScalar(self, sql) -> int : + return self._queryAny(sql) + + def queryString(self, sql) -> str : + return self._queryAny(sql) + +class AnyState: STATE_INVALID = -1 - STATE_EMPTY = 1 # nothing there, no even a DB - STATE_DB_ONLY = 2 # we have a DB, but nothing else - STATE_TABLE_ONLY = 3 # we have a table, but totally empty - STATE_HAS_DATA = 4 # we have some data in the table + STATE_EMPTY = 0 # nothing there, no even a DB + STATE_DB_ONLY = 1 # we have a DB, but nothing else + STATE_TABLE_ONLY = 2 # we have a table, but totally empty + STATE_HAS_DATA = 3 # we have some data in the table + _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"] + + STATE_VAL_IDX = 0 + CAN_CREATE_DB = 1 + CAN_DROP_DB = 2 + CAN_CREATE_FIXED_SUPER_TABLE = 3 + CAN_DROP_FIXED_SUPER_TABLE = 4 + CAN_ADD_DATA = 5 + CAN_READ_DATA = 6 + + def __init__(self): + self._info = self.getInfo() + + def __str__(self): + return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case + + def getInfo(self): + raise RuntimeError("Must be overriden by child classes") + + def equals(self, other): + if isinstance(other, int): + return self.getValIndex() == other + elif isinstance(other, AnyState): + return self.getValIndex() == other.getValIndex() + else: + raise RuntimeError("Unexpected comparison, type = {}".format(type(other))) + + def verifyTasksToState(self, tasks, newState): + raise RuntimeError("Must be overriden by child classes") + + def getValIndex(self): + return self._info[self.STATE_VAL_IDX] + + def getValue(self): + return self._info[self.STATE_VAL_IDX] + def canCreateDb(self): + return self._info[self.CAN_CREATE_DB] + def canDropDb(self): + return self._info[self.CAN_DROP_DB] + def canCreateFixedSuperTable(self): + return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] + def canDropFixedSuperTable(self): + return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] + def canAddData(self): + return self._info[self.CAN_ADD_DATA] + def canReadData(self): + return self._info[self.CAN_READ_DATA] + + def assertAtMostOneSuccess(self, tasks, cls): + sCnt = 0 + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + # task.logDebug("Task success found") + sCnt += 1 + if ( sCnt >= 2 ): + raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + + def assertIfExistThenSuccess(self, tasks, cls): + sCnt = 0 + exists = False + for task in tasks : + if not isinstance(task, cls): + continue + exists = True # we have a valid instance + if task.isSuccess(): + sCnt += 1 + if ( exists and sCnt <= 0 ): + raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + + def assertNoTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__)) + + def assertNoSuccess(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + if task.isSuccess(): + raise RuntimeError("Unexpected successful task: {}".format(cls)) + + def hasSuccess(self, tasks, cls): + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + return True + return False + + def hasTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + return True + return False + +class StateInvalid(AnyState): + def getInfo(self): + return [ + self.STATE_INVALID, + False, False, # can create/drop Db + False, False, # can create/drop fixed table + False, False, # can insert/read data with fixed table + ] + + # def verifyTasksToState(self, tasks, newState): +class StateEmpty(AnyState): + def getInfo(self): + return [ + self.STATE_EMPTY, + True, False, # can create/drop Db + False, False, # can create/drop fixed table + False, False, # can insert/read data with fixed table + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + +class StateDbOnly(AnyState): + def getInfo(self): + return [ + self.STATE_DB_ONLY, + False, True, + True, False, + False, False, + ] + + def verifyTasksToState(self, tasks, newState): + self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases + self.assertIfExistThenSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases + # Nothing to be said about adding data task + if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess + self.assertAtMostOneSuccess(tasks, DropDbTask) + # self._state = self.STATE_EMPTY + elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful + self.assertNoTask(tasks, DropDbTask) # should have have tried + # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet + # # can't say there's add-data attempts, since they may all fail + # self._state = self.STATE_TABLE_ONLY + # else: + # self._state = self.STATE_HAS_DATA + # What about AddFixedData? + # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): + # self._state = self.STATE_HAS_DATA + # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail + # # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks, + # self._state = self.STATE_DB_ONLY # no change + +class StateSuperTableOnly(AnyState): + def getInfo(self): + return [ + self.STATE_TABLE_ONLY, + False, True, + False, True, + True, True, + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + # self._state = self.STATE_DB_ONLY + # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data + # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases + # self._state = self.STATE_HAS_DATA + # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data + # self.assertNoTask(tasks, DropFixedTableTask) + # self.assertNoTask(tasks, AddFixedDataTask) + # self._state = self.STATE_TABLE_ONLY # no change + # else: # did not drop table, did not insert data, did not read successfully, that is impossible + # raise RuntimeError("Unexpected no-success scenarios") + # TODO: need to revamp!! + +class StateHasData(AnyState): + def getInfo(self): + return [ + self.STATE_HAS_DATA, + False, True, + False, True, + True, True, + ] + + def verifyTasksToState(self, tasks, newState): + if ( newState.equals(AnyState.STATE_EMPTY) ): + self.hasSuccess(tasks, DropDbTask) + self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only + if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task + self.assertNoTask(tasks, DropDbTask) # we must have drop_db task + self.hasSuccess(tasks, DropFixedSuperTableTask) + self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy + elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted + self.assertNoTask(tasks, DropDbTask) + self.assertNoTask(tasks, DropFixedSuperTableTask) + self.assertNoTask(tasks, AddFixedDataTask) + # self.hasSuccess(tasks, DeleteDataTasks) + else: + self.assertNoTask(tasks, DropDbTask) + self.assertNoTask(tasks, DropFixedSuperTableTask) + self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) + + +# State of the database as we believe it to be +class DbState(): + def __init__(self): self.tableNumQueue = LinearQueue() self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - self._state = self.STATE_INVALID + + self._state = StateInvalid() # starting state + self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. # self.openDbServerConnection() self._dbConn = DbConn() @@ -425,10 +710,10 @@ class DbState(): else: raise except: - print("[=]Unexpected exception") + print("[=] Unexpected exception") raise self._dbConn.resetDb() # drop and recreate DB - self._state = self.STATE_EMPTY # initial state, the result of above + self._state = StateEmpty() # initial state, the result of above def getDbConn(self): return self._dbConn @@ -441,8 +726,8 @@ class DbState(): tIndex = self.tableNumQueue.push() return tIndex - def getFixedTableName(self): - return "fixed_table" + def getFixedSuperTableName(self): + return "fs_table" def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) @@ -456,6 +741,12 @@ class DbState(): with self._lock: self._lastInt += 1 return self._lastInt + + def getNextBinary(self): + return "Los_Angeles_{}".format(self.getNextInt()) + + def getNextFloat(self): + return 0.9 + self.getNextInt() def getTableNameToDelete(self): tblNum = self.tableNumQueue.pop() # TODO: race condition! @@ -464,134 +755,124 @@ class DbState(): return "table_{}".format(tblNum) - def execSql(self, sql): # using the main DB connection - return self._dbConn.execute(sql) - def cleanUp(self): self._dbConn.close() - def getTasksAtState(self): - tasks = [] - tasks.append(ReadFixedDataTask(self)) # always - if ( self._state == self.STATE_EMPTY ): - tasks.append(CreateDbTask(self)) - tasks.append(CreateFixedTableTask(self)) - elif ( self._state == self.STATE_DB_ONLY ): - tasks.append(DropDbTask(self)) - tasks.append(CreateFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - elif ( self._state == self.STATE_TABLE_ONLY ): - tasks.append(DropFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - tasks.append(DropFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) + # May be slow, use cautionsly... + def getTaskTypesAtState(self): + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks + firstTaskTypes = [] + for tc in allTaskClasses: + # t = tc(self) # create task object + if tc.canBeginFrom(self._state): + firstTaskTypes.append(tc) + # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones + taskTypes = firstTaskTypes.copy() # have to have these + for task1 in firstTaskTypes: # each task type gathered so far + endState = task1.getEndState() # figure the end state + if endState == None: + continue + for tc in allTaskClasses: # what task can further begin from there? + if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): + taskTypes.append(tc) # gather it + + if len(taskTypes) <= 0: + raise RuntimeError("No suitable task types found for state: {}".format(self._state)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes)) + return taskTypes + + # tasks.append(ReadFixedDataTask(self)) # always for everybody + # if ( self._state == self.STATE_EMPTY ): + # tasks.append(CreateDbTask(self)) + # tasks.append(CreateFixedTableTask(self)) + # elif ( self._state == self.STATE_DB_ONLY ): + # tasks.append(DropDbTask(self)) + # tasks.append(CreateFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # elif ( self._state == self.STATE_TABLE_ONLY ): + # tasks.append(DropFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust + # tasks.append(DropFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # else: + # raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + # return tasks + + def pickTaskType(self): + taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state + weights = [] + for tt in taskTypes: + endState = tt.getEndState() + if endState != None : + weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method + else: + weights.append(10) # read data task, default to 10: TODO: change to a constant + i = self._weighted_choice_sub(weights) + # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + return taskTypes[i] + + def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ + rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? + for i, w in enumerate(weights): + rnd -= w + if rnd < 0: + return i + + def _findCurrentState(self): + dbc = self._dbConn + ts = time.time() + if dbc.query("show databases") == 0 : # no database?! + # logger.debug("Found EMPTY state") + logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) + return StateEmpty() + dbc.execute("use db") # did not do this when openning connection + if dbc.query("show tables") == 0 : # no tables + # logger.debug("Found DB ONLY state") + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) + return StateDbOnly() + if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data + # logger.debug("Found TABLE_ONLY state") + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + return StateSuperTableOnly() else: - raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - return tasks - + # logger.debug("Found HAS_DATA state") + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) + return StateHasData() + def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty return # do nothing - if ( self._state == self.STATE_EMPTY ): - # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table - if ( self.hasSuccess(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class - self._state = self.STATE_DB_ONLY - if ( self.hasSuccess(tasks, CreateFixedTableTask )): - self._state = self.STATE_TABLE_ONLY - # else: # no successful table creation, not much we can say, as it is step 2 - else: # did not create db - self.assertNoTask(tasks, CreateDbTask) # because we did not have such task - # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task - self.assertNoSuccess(tasks, CreateFixedTableTask) - - elif ( self._state == self.STATE_DB_ONLY ): - self.assertAtMostOneSuccess(tasks, DropDbTask) - self.assertIfExistThenSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) - # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB - # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) - self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success - # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful - self.assertNoTask(tasks, DropDbTask) # should have have tried - if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet - # can't say there's add-data attempts, since they may all fail - self._state = self.STATE_TABLE_ONLY - else: - self._state = self.STATE_HAS_DATA - else: # no success in dropping db tasks, no success in create fixed table, not acceptable - raise RuntimeError("Unexpected no-success scenario") - - elif ( self._state == self.STATE_TABLE_ONLY ): - if ( self.hasSuccess(tasks, DropFixedTableTask) ): - self.assertAtMostOneSuccess(tasks, DropFixedTableTask) - self._state = self.STATE_DB_ONLY - elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table - self.assertNoTask(tasks, DropFixedTableTask) - self._state = self.STATE_HAS_DATA - else: # did not drop table, did not insert data, that is impossible - raise RuntimeError("Unexpected no-success scenarios") - - elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust - if ( self.hasSuccess(tasks, DropFixedTableTask) ): - self.assertAtMostOneSuccess(tasks, DropFixedTableTask) - self._state = self.STATE_DB_ONLY - elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table - self.assertNoTask(tasks, DropFixedTableTask) - self._state = self.STATE_HAS_DATA - else: # did not drop table, did not insert data, that is impossible - raise RuntimeError("Unexpected no-success scenarios") - else: - raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - logger.debug("New DB state is: {}".format(self._state)) + self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps - def assertAtMostOneSuccess(self, tasks, cls): - sCnt = 0 - for task in tasks : - if not isinstance(task, cls): - continue - if task.isSuccess(): - task.logDebug("Task success found") - sCnt += 1 - if ( sCnt >= 2 ): - raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + # Generic Checks, first based on the start state + if self._state.canCreateDb(): + self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops - def assertIfExistThenSuccess(self, tasks, cls): - sCnt = 0 - exists = False - for task in tasks : - if not isinstance(task, cls): - continue - exists = True # we have a valid instance - if task.isSuccess(): - sCnt += 1 - if ( exists and sCnt <= 0 ): - raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + if self._state.canDropDb(): + self._state.assertIfExistThenSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop - def assertNoTask(self, tasks, cls): - for task in tasks : - if isinstance(task, cls): - raise RuntimeError("Unexpected task: {}".format(cls)) + # if self._state.canCreateFixedTable(): + # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create - def assertNoSuccess(self, tasks, cls): - for task in tasks : - if isinstance(task, cls): - if task.isSuccess(): - raise RuntimeError("Unexpected successful task: {}".format(cls)) + # if self._state.canDropFixedTable(): + # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped + # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop - def hasSuccess(self, tasks, cls): - for task in tasks : - if not isinstance(task, cls): - continue - if task.isSuccess(): - return True - return False + # if self._state.canAddData(): + # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually + + # if self._state.canReadData(): + # Nothing for sure + + newState = self._findCurrentState() + logger.debug("[STT] New DB state determined: {}".format(newState)) + self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? + self._state = newState class TaskExecutor(): def __init__(self, curStep): @@ -614,10 +895,11 @@ class Task(): @classmethod def allocTaskNum(cls): - cls.taskSn += 1 - return cls.taskSn + Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy + # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) + return Task.taskSn - def __init__(self, dbState: DbState): + def __init__(self, dbState: DbState, execStats: ExecutionStats): self._dbState = dbState self._workerThread = None self._err = None @@ -626,19 +908,22 @@ class Task(): # Assign an incremental task serial number self._taskNum = self.allocTaskNum() + # logger.debug("Creating new task {}...".format(self._taskNum)) + + self._execStats = execStats def isSuccess(self): return self._err == None - def clone(self): - newTask = self.__class__(self._dbState) + def clone(self): # TODO: why do we need this again? + newTask = self.__class__(self._dbState, self._execStats) return newTask def logDebug(self, msg): - self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg)) def logInfo(self, msg): - self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg)) def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) @@ -652,29 +937,228 @@ class Task(): self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self._err = None + self._execStats.beginTaskType(self.__class__.__name__) # mark beginning try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - self.logDebug("[=]Taos Execution exception: {0}".format(err)) + self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err)) self._err = err except: - self.logDebug("[=]Unexpected exception") + self.logDebug("[=] Unexpected exception") raise + self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) - self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): return self._dbState.execute(sql) -class CreateDbTask(Task): + +class ExecutionStats: + def __init__(self): + self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task + self._tasksInProgress = 0 + self._lock = threading.Lock() + self._firstTaskStartTime = None + self._execStartTime = None + self._elapsedTime = 0.0 # total elapsed time + self._accRunTime = 0.0 # accumulated run time + + self._failed = False + self._failureReason = None + + def startExec(self): + self._execStartTime = time.time() + + def endExec(self): + self._elapsedTime = time.time() - self._execStartTime + + def incExecCount(self, klassName, isSuccess): # TODO: add a lock here + if klassName not in self._execTimes: + self._execTimes[klassName] = [0, 0] + t = self._execTimes[klassName] # tuple for the data + t[0] += 1 # index 0 has the "total" execution times + if isSuccess: + t[1] += 1 # index 1 has the "success" execution times + + def beginTaskType(self, klassName): + with self._lock: + if self._tasksInProgress == 0 : # starting a new round + self._firstTaskStartTime = time.time() # I am now the first task + self._tasksInProgress += 1 + + def endTaskType(self, klassName, isSuccess): + with self._lock: + self._tasksInProgress -= 1 + if self._tasksInProgress == 0 : # all tasks have stopped + self._accRunTime += (time.time() - self._firstTaskStartTime) + self._firstTaskStartTime = None + + def registerFailure(self, reason): + self._failed = True + self._failureReason = reason + + def logStats(self): + logger.info("----------------------------------------------------------------------") + logger.info("| Crash_Gen test {}, with the following stats:". + format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) + logger.info("| Task Execution Times (success/total):") + execTimesAny = 0 + for k, n in self._execTimes.items(): + execTimesAny += n[0] + logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0])) + + logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny)) + logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress)) + logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) + logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) + logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) + logger.info("----------------------------------------------------------------------") + + + +class StateTransitionTask(Task): + # @classmethod + # def getAllTaskClasses(cls): # static + # return cls.__subclasses__() + @classmethod + def getInfo(cls): # each sub class should supply their own information + raise RuntimeError("Overriding method expected") + + # @classmethod + # def getBeginStates(cls): + # return cls.getInfo()[0] + + @classmethod + def getEndState(cls): # returning the class name + return cls.getInfo()[0] + + @classmethod + def canBeginFrom(cls, state: AnyState): + # return state.getValue() in cls.getBeginStates() + raise RuntimeError("must be overriden") + + def execute(self, wt: WorkerThread): + super().execute(wt) + + + +class CreateDbTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_EMPTY], # can begin from + StateDbOnly() # end state + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateDb() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("create database db") + wt.execSql("create database db") + +class DropDbTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + StateEmpty() + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropDb() -class DropDbTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("drop database db") + logger.debug("[OPS] database dropped at {}".format(time.time())) + +class CreateFixedSuperTableTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_DB_ONLY], + StateSuperTableOnly() + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateFixedSuperTable() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedSuperTableName() + wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(20), f float) ".format(tblName)) + # No need to create the regular tables, INSERT will do that automatically + + +class ReadFixedDataTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + None # meaning doesn't affect state + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canReadData() -class CreateTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + sTbName = self._dbState.getFixedSuperTableName() + dbc = wt.getDbConn() + dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + rTables = dbc.getQueryResult() + # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + for rTbName in rTables : # regular tables + dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + + # tdSql.query(" cars where tbname in ('carzero', 'carone')") + +class DropFixedSuperTableTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + StateDbOnly() # meaning doesn't affect state + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropFixedSuperTable() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedSuperTableName() + wt.execSql("drop table db.{}".format(tblName)) + +class AddFixedDataTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + StateHasData() + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canAddData() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + ds = self._dbState + wt.execSql("use db") # TODO: seems to be an INSERT bug to require this + for i in range(10): # 0 to 9 + sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( + i, + ds.getFixedSuperTableName(), + ds.getNextBinary(), ds.getNextFloat(), + ds.getNextTick(), ds.getNextInt()) + wt.execSql(sql) + + +#---------- Non State-Transition Related Tasks ----------# + +class CreateTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tIndex = self._dbState.addTable() self.logDebug("Creating a table {} ...".format(tIndex)) @@ -682,17 +1166,6 @@ class CreateTableTask(Task): self.logDebug("Table {} created.".format(tIndex)) self._dbState.releaseTable(tIndex) -class CreateFixedTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) - -class ReadFixedDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later - # tdSql.query(" cars where tbname in ('carzero', 'carone')") - class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tableName = self._dbState.getTableNameToDelete() @@ -702,10 +1175,7 @@ class DropTableTask(Task): self.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) -class DropFixedTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - wt.execSql("drop table db.{}".format(tblName)) + class AddDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): @@ -716,16 +1186,11 @@ class AddDataTask(Task): self.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("Executing SQL: {}".format(sql)) + self.logDebug("[SQL] Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - self.logDebug("Finished adding data") + self.logDebug("[OPS] Finished adding data") -class AddFixedDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) - wt.execSql(sql) # Deterministic random number generator class Dice(): @@ -760,28 +1225,45 @@ class Dice(): # Anyone needing to carry out work should simply come here -class WorkDispatcher(): - def __init__(self, dbState): - # self.totalNumMethods = 2 - self.tasks = [ - CreateTableTask(dbState), - DropTableTask(dbState), - AddDataTask(dbState), - ] +# class WorkDispatcher(): +# def __init__(self, dbState): +# # self.totalNumMethods = 2 +# self.tasks = [ +# # CreateTableTask(dbState), # Obsolete +# # DropTableTask(dbState), +# # AddDataTask(dbState), +# ] + +# def throwDice(self): +# max = len(self.tasks) - 1 +# dRes = random.randint(0, max) +# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) +# return dRes + +# def pickTask(self): +# dice = self.throwDice() +# return self.tasks[dice] + +# def doWork(self, workerThread): +# task = self.pickTask() +# task.execute(workerThread) + +class LoggingFilter(logging.Filter): + def filter(self, record: logging.LogRecord): + if ( record.levelno >= logging.INFO ) : + return True # info or above always log + + msg = record.msg + # print("type = {}, value={}".format(type(msg), msg)) + # sys.exit() + + # Commenting out below to adjust... + + # if msg.startswith("[TRD]"): + # return False + return True - def throwDice(self): - max = len(self.tasks) - 1 - dRes = random.randint(0, max) - # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) - return dRes - - def pickTask(self): - dice = self.throwDice() - return self.tasks[dice] - - def doWork(self, workerThread): - task = self.pickTask() - task.execute(workerThread) + def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -810,9 +1292,12 @@ def main(): sys.exit() global logger - logger = logging.getLogger('myApp') + logger = logging.getLogger('CrashGen') + logger.addFilter(LoggingFilter()) if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO + else: + logger.setLevel(logging.INFO) ch = logging.StreamHandler() logger.addHandler(ch) @@ -820,12 +1305,21 @@ def main(): Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), - WorkDispatcher(dbState), + # WorkDispatcher(dbState), # Obsolete? dbState ) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + tc.run() - dbState.cleanUp() - logger.info("Finished running thread pool") + tc.logStats() + dbState.cleanUp() + + # logger.info("Crash_Gen execution finished") if __name__ == "__main__": main() diff --git a/tests/pytest/query/filterCombo.py b/tests/pytest/query/filterCombo.py index c25b6156b85484096723bb74ba4e8e48c2adad45..c9c7ade73eece9a33ae99177dc680463e22d2701 100644 --- a/tests/pytest/query/filterCombo.py +++ b/tests/pytest/query/filterCombo.py @@ -49,7 +49,8 @@ class TDTestCase: tdSql.error("select * from db.st where ts > '2020-05-13 10:00:00.002' OR tagtype < 2") # illegal condition - tdSql.error("select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2") + tdSql.error("select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2") + tdSql.error("select * from db.st where tagtype <> 1 OR tagtype < 2") def stop(self): tdSql.close() diff --git a/tests/pytest/query/queryError.py b/tests/pytest/query/queryError.py index d0515ef34dd14ba5b78c43c7023622f9978a319e..95924f48cc062b3756ab5e80fb72b45f707f8939 100644 --- a/tests/pytest/query/queryError.py +++ b/tests/pytest/query/queryError.py @@ -41,13 +41,12 @@ class TDTestCase: ('2020-05-13 10:00:00.002', 3, 'third') dev_002 VALUES('2020-05-13 10:00:00.003', 1, 'first'), ('2020-05-13 10:00:00.004', 2, 'second'), ('2020-05-13 10:00:00.005', 3, 'third')""") - """Error expected here, but no errors + # query first .. as .. tdSql.error("select first(*) as one from st") # query last .. as .. - tdSql.error("select last(*) as latest from st") - """ + tdSql.error("select last(*) as latest from st") # query last row .. as .. tdSql.error("select last_row as latest from st") diff --git a/tests/pytest/query/queryNormal.py b/tests/pytest/query/queryNormal.py index 3ca9c1d1357de6de3eb63347a2a104fd947a3e66..442661f72a4510e9d311ee38bbc16a204bf71670 100644 --- a/tests/pytest/query/queryNormal.py +++ b/tests/pytest/query/queryNormal.py @@ -31,14 +31,22 @@ class TDTestCase: tdSql.execute("create table stb1 (ts timestamp, c1 int, c2 float) tags(t1 int, t2 binary(10), t3 nchar(10))") tdSql.execute("insert into tb1 using stb1 tags(1,'tb1', '表1') values ('2020-04-18 15:00:00.000', 1, 0.1), ('2020-04-18 15:00:01.000', 2, 0.1)") tdSql.execute("insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)") - - # join 2 tables -- bug exists - # tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts") - # tdSql.checkRows(1) + + # inner join --- bug + tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts") + tdSql.checkRows(1) # join 3 tables -- bug exists - # tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_p.id, stb_p.dscrption, stb_p.pressure,stb_v.velocity from stb_p, stb_t, stb_v where stb_p.ts=stb_t.ts and stb_p.ts=stb_v.ts and stb_p.id = stb_t.id") + tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_p.id, stb_p.dscrption, stb_p.pressure,stb_v.velocity from stb_p, stb_t, stb_v where stb_p.ts=stb_t.ts and stb_p.ts=stb_v.ts and stb_p.id = stb_t.id") + # query show stable + tdSql.query("show stables") + tdSql.checkRows(1) + + # query show tables + tdSql.query("show table") + tdSql.checkRows(2) + # query count tdSql.query("select count(*) from stb1") tdSql.checkData(0, 0, 4) @@ -51,6 +59,10 @@ class TDTestCase: tdSql.query("select last(*) from stb1") tdSql.checkData(0, 1, 4) + # query last_row + tdSql.query("select last_row(*) from stb1") + tdSql.checkData(0, 1, 4) + # query as tdSql.query("select t2 as number from stb1") tdSql.checkRows(2) @@ -63,6 +75,10 @@ class TDTestCase: tdSql.query("select last(*) as end from stb1") tdSql.checkData(0, 1, 4) + # query last_row ... as + tdSql.query("select last_row(*) as end from stb1") + tdSql.checkData(0, 1, 4) + # query group .. by tdSql.query("select sum(c1), t2 from stb1 group by t2") tdSql.checkRows(2) @@ -75,6 +91,34 @@ class TDTestCase: tdSql.query("select * from stb1 limit 2 offset 3") tdSql.checkRows(1) + # query ... alias for table ---- bug + tdSql.query("select t.ts from tb1 t") + tdSql.checkRows(2) + + # query ... tbname + tdSql.query("select tbname from stb1") + tdSql.checkRows(2) + + # query ... tbname count ---- bug + tdSql.query("select count(tbname) from stb1") + tdSql.checkRows(2) + + # query ... select database ---- bug + tdSql.query("SELECT database()") + tdSql.checkRows(1) + + # query ... select client_version ---- bug + tdSql.query("SELECT client_version()") + tdSql.checkRows(1) + + # query ... select server_version ---- bug + tdSql.query("SELECT server_version()") + tdSql.checkRows(1) + + # query ... select server_status ---- bug + tdSql.query("SELECT server_status()") + tdSql.checkRows(1) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__)