diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 7534a324099fff01cf4ca47a1da51800ec3c3fb7..1ab14a4eae1758652f896436bbc40efc1133f10c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -579,9 +579,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_FETCH) && - (pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || - (pRes->code == TSDB_CODE_SUCCESS && pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) { + pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && + ((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || + (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows, diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4c8722ecea421d1509a99e7396b145dbb266b5d4..1925546222520e8714cf8cc46fc11ef6788e4eb6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1850,8 +1850,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { - STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); - // todo handle error + STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup assert(pTableMeta != NULL); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList); } else { // transfer the ownership of pTableMeta to the newly create sql object. diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index aacf8f419ff445e7ec1d5ef4f6066a52336fbd9a..594fd3fd50b0adede5e5ff759a43e8f2ddba9ec2 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -220,6 +220,7 @@ typedef struct SAcctObj { typedef struct { int8_t type; + int32_t index; char db[TSDB_DB_NAME_LEN + 1]; void * pIter; int16_t numOfColumns; @@ -228,7 +229,6 @@ typedef struct { int32_t numOfReads; int16_t offset[TSDB_MAX_COLUMNS]; int16_t bytes[TSDB_MAX_COLUMNS]; - void * signature; uint16_t payloadLen; char payload[]; } SShowObj; diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index ac8eb73296ea3ffd85b6a285b30b0dd3dae48258..d61145d9b82ffa643a52c0e4feb6c75443079153 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -27,7 +27,8 @@ void mnodeCleanupVgroups(); SVgObj *mnodeGetVgroup(int32_t vgId); void mnodeIncVgroupRef(SVgObj *pVgroup); void mnodeDecVgroupRef(SVgObj *pVgroup); -void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg); +void mnodeDropAllDbVgroups(SDbObj *pDropDb); +void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb); void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode); void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index b904e06e977ef56cfb21480b05332a5502cdee2e..0ad835279e44d76eeaf8aa7ee9f2cdc551bdda54 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -81,10 +81,10 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) { SDbObj *pDb = pOper->pObj; SAcctObj *pAcct = mnodeGetAcct(pDb->acct); - mnodeDropDbFromAcct(pAcct, pDb); mnodeDropAllChildTables(pDb); mnodeDropAllSuperTables(pDb); - mnodeDropAllDbVgroups(pDb, false); + mnodeDropAllDbVgroups(pDb); + mnodeDropDbFromAcct(pAcct, pDb); mnodeDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; @@ -998,19 +998,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { return code; } -#if 1 - mnodeDropAllDbVgroups(pMsg->pDb, true); -#else - SVgObj *pVgroup = pMsg->pDb->pHead; - if (pVgroup != NULL) { - mPrint("vgId:%d, will be dropped", pVgroup->vgId); - SMnodeMsg *newMsg = mnodeCloneMsg(pMsg); - newMsg->ahandle = pVgroup; - newMsg->expected = pVgroup->numOfVnodes; - mnodeDropVgroup(pVgroup, newMsg); - return; - } -#endif + mnodeSendDropAllDbVgroupsMsg(pMsg->pDb); mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name); return mnodeDropDb(pMsg); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index d28d0b5b307e47b68ad18c4c33abe8cf2d5e9d15..20616bfbcdae827f6bb2ca31273d349650f704f2 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -47,13 +47,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg); static void mnodeFreeShowObj(void *data); -static bool mnodeCheckShowObj(SShowObj *pShow); +static bool mnodeAccquireShowObj(SShowObj *pShow); static bool mnodeCheckShowFinished(SShowObj *pShow); -static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size); -static void mnodeCleanupShowObj(void *pShow, bool forceRemove); +static void *mnodePutShowObj(SShowObj *pShow, int32_t size); +static void mnodeReleaseShowObj(void *pShow, bool forceRemove); extern void *tsMnodeTmr; -static void *tsQhandleCache = NULL; +static void *tsMnodeShowCache = NULL; +static int32_t tsShowObjIndex = 0; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; @@ -64,14 +65,15 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsQhandleCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); + tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); return 0; } void mnodeCleanUpShow() { - if (tsQhandleCache != NULL) { - taosCacheCleanup(tsQhandleCache); - tsQhandleCache = NULL; + if (tsMnodeShowCache != NULL) { + mPrint("show cache is cleanup"); + taosCacheCleanup(tsMnodeShowCache); + tsMnodeShowCache = NULL; } } @@ -118,13 +120,12 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen); SShowObj *pShow = (SShowObj *) calloc(1, showObjSize); - pShow->signature = pShow; pShow->type = pShowMsg->type; pShow->payloadLen = htons(pShowMsg->payloadLen); strcpy(pShow->db, pShowMsg->db); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - pShow = mnodeSaveShowObj(pShow, showObjSize); + pShow = mnodePutShowObj(pShow, showObjSize); if (pShow == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -132,21 +133,22 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SCMShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { - mnodeFreeShowObj(pShow); + mnodeReleaseShowObj(pShow, true); return TSDB_CODE_SERV_OUT_OF_MEMORY; } pShowRsp->qhandle = htobe64((uint64_t) pShow); - mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); + mTrace("%p, show type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle); if (code == 0) { pMsg->rpcRsp.rsp = pShowRsp; pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; + mnodeReleaseShowObj(pShow, false); return TSDB_CODE_SUCCESS; } else { - mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); + mError("%p, show type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); rpcFreeCont(pShowRsp); - mnodeCleanupShowObj(pShow, true); + mnodeReleaseShowObj(pShow, true); return code; } } @@ -159,22 +161,20 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pRetrieve->qhandle = htobe64(pRetrieve->qhandle); SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); + mTrace("%p, show type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type)); /* * in case of server restart, apps may hold qhandle created by server before * restart, which is actually invalid, therefore, signature check is required. */ - if (!mnodeCheckShowObj(pShow)) { - mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow); + if (!mnodeAccquireShowObj(pShow)) { + mError("%p, show is invalid", pShow); return TSDB_CODE_INVALID_QHANDLE; } if (mnodeCheckShowFinished(pShow)) { - mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows); + mTrace("%p, show is already read finished, numOfReads:%d numOfRows:%d", pShow, pShow->numOfReads, pShow->numOfRows); pShow->numOfReads = pShow->numOfRows; - //mnodeCleanupShowObj(pShow, true); - //return TSDB_CODE_SUCCESS; } if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -200,7 +200,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { if (rowsRead < 0) { rpcFreeCont(pRsp); - mnodeCleanupShowObj(pShow, false); + mnodeReleaseShowObj(pShow, false); assert(false); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -211,12 +211,13 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = size; - if (rowsToRead == 0) { - mnodeCleanupShowObj(pShow, true); + if (rowsToRead == 0 || rowsRead == rowsToRead) { + pRsp->completed = 1; + mnodeReleaseShowObj(pShow, true); } else { - mnodeCleanupShowObj(pShow, false); + mnodeReleaseShowObj(pShow, false); } - + return TSDB_CODE_SUCCESS; } @@ -318,24 +319,29 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) { return false; } -static bool mnodeCheckShowObj(SShowObj *pShow) { - SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow); +static bool mnodeAccquireShowObj(SShowObj *pShow) { + char key[10]; + sprintf(key, "%d", pShow->index); + + SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key); if (pSaved == pShow) { + mTrace("%p, show is accquired from cache", pShow); return true; } else { - mTrace("show:%p, is already released", pShow); return false; } } -static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { - if (tsQhandleCache != NULL) { - char key[24]; - sprintf(key, "show:%p", pShow); - SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60); +static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { + if (tsMnodeShowCache != NULL) { + char key[10]; + pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); + sprintf(key, "%d", pShow->index); + + SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 60); free(pShow); - mTrace("show:%p, is saved", newQhandle); + mTrace("%p, show is put into cache", newQhandle); return newQhandle; } @@ -345,10 +351,10 @@ static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) { static void mnodeFreeShowObj(void *data) { SShowObj *pShow = data; sdbFreeIter(pShow->pIter); - mTrace("show:%p, is destroyed", pShow); + mTrace("%p, show is destroyed", pShow); } -static void mnodeCleanupShowObj(void *pShow, bool forceRemove) { - mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false"); - taosCacheRelease(tsQhandleCache, &pShow, forceRemove); +static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { + mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); + taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 712b8c2a8ebd4e22aad185220831ecbce7305891..84a1f659d58d6994940de5b568932403385f004c 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -148,37 +148,30 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) { return TSDB_CODE_INVALID_VGROUP_ID; } - SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_INVALID_VGROUP_ID; - } - mnodeDecVgroupRef(pVgroup); - - SDbObj *pDb = mnodeGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("ctable:%s, vgId:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); - return TSDB_CODE_INVALID_DB; - } - mnodeDecDbRef(pDb); - - SAcctObj *pAcct = mnodeGetAcct(pDb->acct); - if (pAcct == NULL) { - mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct); - return TSDB_CODE_INVALID_ACCT; - } - mnodeDecAcctRef(pAcct); + SVgObj *pVgroup = NULL; + SDbObj *pDb = NULL; + SAcctObj *pAcct = NULL; + + pVgroup = mnodeGetVgroup(pTable->vgId); + if (pVgroup != NULL) pDb = mnodeGetDb(pVgroup->dbName); + if (pDb != NULL) pAcct = mnodeGetAcct(pDb->acct); if (pTable->info.type == TSDB_CHILD_TABLE) { grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); - pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); + if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); mnodeRemoveTableFromStable(pTable->superTable, pTable); mnodeDecTableRef(pTable->superTable); } else { grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); + if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); } - mnodeRemoveTableFromDb(pDb); - mnodeRemoveTableFromVgroup(pVgroup, pTable); + + if (pDb != NULL) mnodeRemoveTableFromDb(pDb); + if (pVgroup != NULL) mnodeRemoveTableFromVgroup(pVgroup, pTable); + + mnodeDecVgroupRef(pVgroup); + mnodeDecDbRef(pDb); + mnodeDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; } @@ -693,10 +686,10 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { } if (pCreate->numOfTags != 0) { - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); + mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateSuperTableMsg(pMsg); } else { - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); + mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateChildTableMsg(pMsg); } } @@ -1276,9 +1269,9 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); - if (pTable->vgHash != NULL) { + if (pTable != NULL && pTable->vgHash != NULL) { contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo)); - } + } mnodeDecTableRef(pTable); } @@ -1287,12 +1280,23 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } - pRsp->numOfTables = htonl(numOfTable); - char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg); + pRsp->numOfTables = 0; + char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); + if (pTable == NULL) { + mError("stable:%s, not exist while get stable vgroup info", stableName); + mnodeDecTableRef(pTable); + continue; + } + if (pTable->vgHash == NULL) { + mError("stable:%s, not vgroup exist while get stable vgroup info", stableName); + mnodeDecTableRef(pTable); + continue; + } + SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg; SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash); @@ -1318,17 +1322,25 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { } taosHashDestroyIter(pIter); + mnodeDecTableRef(pTable); pVgroupInfo->numOfVgroups = htonl(vgSize); // one table is done, try the next table msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo); + pRsp->numOfTables++; } - pMsg->rpcRsp.rsp = pRsp; - pMsg->rpcRsp.len = msg - (char *)pRsp; + if (pRsp->numOfTables != numOfTable) { + rpcFreeCont(pRsp); + return TSDB_CODE_INVALID_TABLE_ID; + } else { + pRsp->numOfTables = htonl(pRsp->numOfTables); + pMsg->rpcRsp.rsp = pRsp; + pMsg->rpcRsp.len = msg - (char *)pRsp; - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; + } } static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) { @@ -1429,7 +1441,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb SSuperTableObj *pSuperTable = mnodeGetSuperTable(pTagData->name); if (pSuperTable == NULL) { mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name); - free(pTable); + mnodeDestroyChildTable(pTable); terrno = TSDB_CODE_INVALID_TABLE_ID; return NULL; } @@ -1738,7 +1750,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; - STagData* pTag = (STagData*)pInfo->tags; + STagData *pTag = (STagData *)pInfo->tags; int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen); SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); @@ -1754,14 +1766,13 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { pCreateMsg->contLen = htonl(contLen); memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); + mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, ((STagData *)(pCreateMsg->schema))->name); rpcFreeCont(pMsg->rpcMsg.pCont); pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; pMsg->rpcMsg.pCont = pCreateMsg; pMsg->rpcMsg.contLen = contLen; - - mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, pInfo->tags); - + return TSDB_CODE_ACTION_NEED_REPROCESSED; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index fb6250e7c05f664af25f15ec996d4cea43708891..9c24f7eaf3e3e6d0d88e2a27fe4f9b72db1956f2 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -784,7 +784,7 @@ void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb) { mPrint("db:%s, all vgroups is updated in sdb", pAlterDb->name); } -void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { +void mnodeDropAllDbVgroups(SDbObj *pDropDb) { void * pIter = NULL; int32_t numOfVgroups = 0; SVgObj *pVgroup = NULL; @@ -802,10 +802,6 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { }; sdbDeleteRow(&oper); numOfVgroups++; - - if (sendMsg) { - mnodeSendDropVgroupMsg(pVgroup, NULL); - } } mnodeDecVgroupRef(pVgroup); @@ -815,3 +811,25 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups); } + +void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) { + void * pIter = NULL; + int32_t numOfVgroups = 0; + SVgObj *pVgroup = NULL; + + mPrint("db:%s, all vgroups will be dropped in dnode", pDropDb->name); + while (1) { + pIter = mnodeGetNextVgroup(pIter, &pVgroup); + if (pVgroup == NULL) break; + + if (pVgroup->pDb == pDropDb) { + mnodeSendDropVgroupMsg(pVgroup, NULL); + } + + mnodeDecVgroupRef(pVgroup); + } + + sdbFreeIter(pIter); + + mPrint("db:%s, all vgroups:%d drop msg is sent to dnode", pDropDb->name, numOfVgroups); +} \ No newline at end of file diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 44c978957e0ccf49805ded9e5c26ba0bfb3a656b..2e5d84d45f1cc7545c0af509087f930491d4bd0b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import traceback # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -24,11 +25,13 @@ import copy import threading import random +import time import logging import datetime import textwrap from typing import List +from typing import Dict from util.log import * from util.dnodes import * @@ -45,6 +48,14 @@ logger = None def runThread(wt: WorkerThread): wt.run() +class CrashGenError(Exception): + def __init__(self, msg=None, errno=None): + self.msg = msg + self.errno = errno + + def __str__(self): + return self.msg + class WorkerThread: def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, @@ -63,10 +74,10 @@ class WorkerThread: self._dbConn = DbConn() def logDebug(self, msg): - logger.info(" t[{}] {}".format(self._tid, msg)) + logger.debug(" TRD[{}] {}".format(self._tid, msg)) def logInfo(self, msg): - logger.info(" t[{}] {}".format(self._tid, msg)) + logger.info(" TRD[{}] {}".format(self._tid, msg)) def getTaskExecutor(self): @@ -95,15 +106,19 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - logger.debug("Thread task loop exited barrier...") + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - logger.debug("Thread task loop exited step gate...") + logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): + logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() + logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) + logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -123,7 +138,7 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # Wait again at the "gate", waiting to be "tapped" - # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) + logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid)) self._stepGate.wait() self._stepGate.clear() @@ -133,33 +148,40 @@ class WorkerThread: self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - logger.debug("Tapping worker thread {}".format(self._tid)) + logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) self._stepGate.set() # wake up! time.sleep(0) # let the released thread run a bit - def execSql(self, sql): # not "execute", since we are out side the DB context + def execSql(self, sql): # TODO: expose DbConn directly if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: return self._tc.getDbState().getDbConn().execute(sql) - def querySql(self, sql): # not "execute", since we are out side the DB context + def getDbConn(self): if ( gConfig.per_thread_db_connection ): - return self._dbConn.query(sql) + return self._dbConn else: - return self._tc.getDbState().getDbConn().query(sql) - + return self._tc.getDbState().getDbConn() + + # def querySql(self, sql): # not "execute", since we are out side the DB context + # if ( gConfig.per_thread_db_connection ): + # return self._dbConn.query(sql) + # else: + # return self._tc.getDbState().getDbConn().query(sql) + class ThreadCoordinator: - def __init__(self, pool, wd: WorkDispatcher, dbState): + def __init__(self, pool, dbState): self._curStep = -1 # first step is 0 self._pool = pool - self._wd = wd + # self._wd = wd self._te = None # prepare for every new step self._dbState = dbState self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads + self._execStats = ExecutionStats() def getTaskExecutor(self): return self._te @@ -176,41 +198,63 @@ class ThreadCoordinator: # Coordinate all threads step by step self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore - while(self._curStep < maxSteps): - print(".", end="", flush=True) - logger.debug("Main thread going to sleep") + self._execStats.startExec() # start the stop watch + failed = False + while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 + if not gConfig.debug: + print(".", end="", flush=True) # print this only if we are not in debug mode + logger.debug("[TRD] Main thread going to sleep") # Now ready to enter a step self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + try: + self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + except taos.error.ProgrammingError as err: + if ( err.msg == 'network unavailable' ): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + failed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at end, and maybe signal them to stop + else: + raise + finally: + pass + self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step - logger.info("<-- Step {} finished".format(self._curStep)) + logger.debug("<-- Step {} finished".format(self._curStep)) self._curStep += 1 # we are about to get into next step. TODO: race condition here! logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step - self._te = TaskExecutor(self._curStep) + if not failed: # only if not failed + self._te = TaskExecutor(self._curStep) - logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep + logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep self.tapAllThreads() logger.debug("Main thread ready to finish up...") - self.crossStepBarrier() # Cross it one last time, after all threads finish - self._stepBarrier.reset() - logger.debug("Main thread in exclusive zone...") - self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") - self.tapAllThreads() # Let the threads run one last time + if not failed: # only in regular situations + self.crossStepBarrier() # Cross it one last time, after all threads finish + self._stepBarrier.reset() + logger.debug("Main thread in exclusive zone...") + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish + logger.info("All worker thread finished") + self._execStats.endExec() - logger.info("All threads finished") - print("\r\nFinished") + def logStats(self): + self._execStats.logStats() def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -219,7 +263,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.info("Waking up threads: {}".format(str(wakeSeq))) + logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! @@ -233,11 +277,15 @@ class ThreadCoordinator: raise RuntimeError("Cannot fetch task when not running") # return self._wd.pickTask() # Alternatively, let's ask the DbState for the appropriate task - dbState = self.getDbState() - tasks = dbState.getTasksAtState() - i = Dice.throw(len(tasks)) - # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. - return tasks[i].clone() + # dbState = self.getDbState() + # tasks = dbState.getTasksAtState() # TODO: create every time? + # nTasks = len(tasks) + # i = Dice.throw(nTasks) + # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) + # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + # return tasks[i].clone() # TODO: still necessary? + taskType = self.getDbState().pickTaskType() # pick a task type for current state + return taskType(self.getDbState(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -253,7 +301,7 @@ class ThreadPool: self.maxSteps = maxSteps self.funcSequencer = funcSequencer # Internal class variables - self.dispatcher = WorkDispatcher(dbState) + # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads @@ -362,7 +410,7 @@ class DbConn: # Get the connection/cursor ready self._cursor.execute('reset query cache') - # self._cursor.execute('use db') + # self._cursor.execute('use db') # note we do this in _findCurrenState # Open connection self._tdSql = TDSql() @@ -390,28 +438,265 @@ class DbConn: def execute(self, sql): if ( not self.isOpen ): raise RuntimeError("Cannot execute database commands until connection is open") - return self._tdSql.execute(sql) + logger.debug("[SQL] Executing SQL: {}".format(sql)) + nRows = self._tdSql.execute(sql) + logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows - def query(self, sql) -> int : # return number of rows retrieved + def query(self, sql) : # return rows affected if ( not self.isOpen ): raise RuntimeError("Cannot query database until connection is open") - return self._tdSql.query(sql) + logger.debug("[SQL] Executing SQL: {}".format(sql)) + nRows = self._tdSql.query(sql) + logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows + # results are in: return self._tdSql.queryResult + def getQueryResult(self): + return self._tdSql.queryResult -# State of the database as we believe it to be -class DbState(): + def _queryAny(self, sql) : # actual query result as an int + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + tSql = self._tdSql + nRows = tSql.query(sql) + if nRows != 1 : + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + if tSql.queryRows != 1 or tSql.queryCols != 1: + raise RuntimeError("Unexpected result set for query: {}".format(sql)) + return tSql.queryResult[0][0] + + def queryScalar(self, sql) -> int : + return self._queryAny(sql) + + def queryString(self, sql) -> str : + return self._queryAny(sql) + +class AnyState: STATE_INVALID = -1 - STATE_EMPTY = 1 # nothing there, no even a DB - STATE_DB_ONLY = 2 # we have a DB, but nothing else - STATE_TABLE_ONLY = 3 # we have a table, but totally empty - STATE_HAS_DATA = 4 # we have some data in the table + STATE_EMPTY = 0 # nothing there, no even a DB + STATE_DB_ONLY = 1 # we have a DB, but nothing else + STATE_TABLE_ONLY = 2 # we have a table, but totally empty + STATE_HAS_DATA = 3 # we have some data in the table + _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"] + + STATE_VAL_IDX = 0 + CAN_CREATE_DB = 1 + CAN_DROP_DB = 2 + CAN_CREATE_FIXED_SUPER_TABLE = 3 + CAN_DROP_FIXED_SUPER_TABLE = 4 + CAN_ADD_DATA = 5 + CAN_READ_DATA = 6 + + def __init__(self): + self._info = self.getInfo() + + def __str__(self): + return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case + + def getInfo(self): + raise RuntimeError("Must be overriden by child classes") + + def equals(self, other): + if isinstance(other, int): + return self.getValIndex() == other + elif isinstance(other, AnyState): + return self.getValIndex() == other.getValIndex() + else: + raise RuntimeError("Unexpected comparison, type = {}".format(type(other))) + + def verifyTasksToState(self, tasks, newState): + raise RuntimeError("Must be overriden by child classes") + + def getValIndex(self): + return self._info[self.STATE_VAL_IDX] + + def getValue(self): + return self._info[self.STATE_VAL_IDX] + def canCreateDb(self): + return self._info[self.CAN_CREATE_DB] + def canDropDb(self): + return self._info[self.CAN_DROP_DB] + def canCreateFixedSuperTable(self): + return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] + def canDropFixedSuperTable(self): + return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] + def canAddData(self): + return self._info[self.CAN_ADD_DATA] + def canReadData(self): + return self._info[self.CAN_READ_DATA] + + def assertAtMostOneSuccess(self, tasks, cls): + sCnt = 0 + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + # task.logDebug("Task success found") + sCnt += 1 + if ( sCnt >= 2 ): + raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + + def assertIfExistThenSuccess(self, tasks, cls): + sCnt = 0 + exists = False + for task in tasks : + if not isinstance(task, cls): + continue + exists = True # we have a valid instance + if task.isSuccess(): + sCnt += 1 + if ( exists and sCnt <= 0 ): + raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + + def assertNoTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__)) + + def assertNoSuccess(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + if task.isSuccess(): + raise RuntimeError("Unexpected successful task: {}".format(cls)) + + def hasSuccess(self, tasks, cls): + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + return True + return False + + def hasTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + return True + return False + +class StateInvalid(AnyState): + def getInfo(self): + return [ + self.STATE_INVALID, + False, False, # can create/drop Db + False, False, # can create/drop fixed table + False, False, # can insert/read data with fixed table + ] + + # def verifyTasksToState(self, tasks, newState): +class StateEmpty(AnyState): + def getInfo(self): + return [ + self.STATE_EMPTY, + True, False, # can create/drop Db + False, False, # can create/drop fixed table + False, False, # can insert/read data with fixed table + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + +class StateDbOnly(AnyState): + def getInfo(self): + return [ + self.STATE_DB_ONLY, + False, True, + True, False, + False, False, + ] + + def verifyTasksToState(self, tasks, newState): + self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases + self.assertIfExistThenSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases + # Nothing to be said about adding data task + if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess + self.assertAtMostOneSuccess(tasks, DropDbTask) + # self._state = self.STATE_EMPTY + elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful + self.assertNoTask(tasks, DropDbTask) # should have have tried + # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet + # # can't say there's add-data attempts, since they may all fail + # self._state = self.STATE_TABLE_ONLY + # else: + # self._state = self.STATE_HAS_DATA + # What about AddFixedData? + # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): + # self._state = self.STATE_HAS_DATA + # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail + # # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks, + # self._state = self.STATE_DB_ONLY # no change + +class StateSuperTableOnly(AnyState): + def getInfo(self): + return [ + self.STATE_TABLE_ONLY, + False, True, + False, True, + True, True, + ] + + def verifyTasksToState(self, tasks, newState): + if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + # self._state = self.STATE_DB_ONLY + # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data + # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases + # self._state = self.STATE_HAS_DATA + # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data + # self.assertNoTask(tasks, DropFixedTableTask) + # self.assertNoTask(tasks, AddFixedDataTask) + # self._state = self.STATE_TABLE_ONLY # no change + # else: # did not drop table, did not insert data, did not read successfully, that is impossible + # raise RuntimeError("Unexpected no-success scenarios") + # TODO: need to revamp!! + +class StateHasData(AnyState): + def getInfo(self): + return [ + self.STATE_HAS_DATA, + False, True, + False, True, + True, True, + ] + + def verifyTasksToState(self, tasks, newState): + if ( newState.equals(AnyState.STATE_EMPTY) ): + self.hasSuccess(tasks, DropDbTask) + self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only + if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task + self.assertNoTask(tasks, DropDbTask) # we must have drop_db task + self.hasSuccess(tasks, DropFixedSuperTableTask) + self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy + elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted + self.assertNoTask(tasks, DropDbTask) + self.assertNoTask(tasks, DropFixedSuperTableTask) + self.assertNoTask(tasks, AddFixedDataTask) + # self.hasSuccess(tasks, DeleteDataTasks) + else: + self.assertNoTask(tasks, DropDbTask) + self.assertNoTask(tasks, DropFixedSuperTableTask) + self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) + + +# State of the database as we believe it to be +class DbState(): + def __init__(self): self.tableNumQueue = LinearQueue() self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - self._state = self.STATE_INVALID + + self._state = StateInvalid() # starting state + self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. # self.openDbServerConnection() self._dbConn = DbConn() @@ -425,10 +710,10 @@ class DbState(): else: raise except: - print("[=]Unexpected exception") + print("[=] Unexpected exception") raise self._dbConn.resetDb() # drop and recreate DB - self._state = self.STATE_EMPTY # initial state, the result of above + self._state = StateEmpty() # initial state, the result of above def getDbConn(self): return self._dbConn @@ -441,8 +726,8 @@ class DbState(): tIndex = self.tableNumQueue.push() return tIndex - def getFixedTableName(self): - return "fixed_table" + def getFixedSuperTableName(self): + return "fs_table" def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) @@ -456,6 +741,12 @@ class DbState(): with self._lock: self._lastInt += 1 return self._lastInt + + def getNextBinary(self): + return "Los_Angeles_{}".format(self.getNextInt()) + + def getNextFloat(self): + return 0.9 + self.getNextInt() def getTableNameToDelete(self): tblNum = self.tableNumQueue.pop() # TODO: race condition! @@ -464,134 +755,124 @@ class DbState(): return "table_{}".format(tblNum) - def execSql(self, sql): # using the main DB connection - return self._dbConn.execute(sql) - def cleanUp(self): self._dbConn.close() - def getTasksAtState(self): - tasks = [] - tasks.append(ReadFixedDataTask(self)) # always - if ( self._state == self.STATE_EMPTY ): - tasks.append(CreateDbTask(self)) - tasks.append(CreateFixedTableTask(self)) - elif ( self._state == self.STATE_DB_ONLY ): - tasks.append(DropDbTask(self)) - tasks.append(CreateFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - elif ( self._state == self.STATE_TABLE_ONLY ): - tasks.append(DropFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) - elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - tasks.append(DropFixedTableTask(self)) - tasks.append(AddFixedDataTask(self)) + # May be slow, use cautionsly... + def getTaskTypesAtState(self): + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks + firstTaskTypes = [] + for tc in allTaskClasses: + # t = tc(self) # create task object + if tc.canBeginFrom(self._state): + firstTaskTypes.append(tc) + # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones + taskTypes = firstTaskTypes.copy() # have to have these + for task1 in firstTaskTypes: # each task type gathered so far + endState = task1.getEndState() # figure the end state + if endState == None: + continue + for tc in allTaskClasses: # what task can further begin from there? + if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): + taskTypes.append(tc) # gather it + + if len(taskTypes) <= 0: + raise RuntimeError("No suitable task types found for state: {}".format(self._state)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes)) + return taskTypes + + # tasks.append(ReadFixedDataTask(self)) # always for everybody + # if ( self._state == self.STATE_EMPTY ): + # tasks.append(CreateDbTask(self)) + # tasks.append(CreateFixedTableTask(self)) + # elif ( self._state == self.STATE_DB_ONLY ): + # tasks.append(DropDbTask(self)) + # tasks.append(CreateFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # elif ( self._state == self.STATE_TABLE_ONLY ): + # tasks.append(DropFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust + # tasks.append(DropFixedTableTask(self)) + # tasks.append(AddFixedDataTask(self)) + # else: + # raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + # return tasks + + def pickTaskType(self): + taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state + weights = [] + for tt in taskTypes: + endState = tt.getEndState() + if endState != None : + weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method + else: + weights.append(10) # read data task, default to 10: TODO: change to a constant + i = self._weighted_choice_sub(weights) + # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + return taskTypes[i] + + def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ + rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? + for i, w in enumerate(weights): + rnd -= w + if rnd < 0: + return i + + def _findCurrentState(self): + dbc = self._dbConn + ts = time.time() + if dbc.query("show databases") == 0 : # no database?! + # logger.debug("Found EMPTY state") + logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) + return StateEmpty() + dbc.execute("use db") # did not do this when openning connection + if dbc.query("show tables") == 0 : # no tables + # logger.debug("Found DB ONLY state") + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) + return StateDbOnly() + if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data + # logger.debug("Found TABLE_ONLY state") + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + return StateSuperTableOnly() else: - raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - return tasks - + # logger.debug("Found HAS_DATA state") + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) + return StateHasData() + def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty return # do nothing - if ( self._state == self.STATE_EMPTY ): - # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table - if ( self.hasSuccess(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class - self._state = self.STATE_DB_ONLY - if ( self.hasSuccess(tasks, CreateFixedTableTask )): - self._state = self.STATE_TABLE_ONLY - # else: # no successful table creation, not much we can say, as it is step 2 - else: # did not create db - self.assertNoTask(tasks, CreateDbTask) # because we did not have such task - # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task - self.assertNoSuccess(tasks, CreateFixedTableTask) - - elif ( self._state == self.STATE_DB_ONLY ): - self.assertAtMostOneSuccess(tasks, DropDbTask) - self.assertIfExistThenSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) - # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB - # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) - self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success - # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful - self.assertNoTask(tasks, DropDbTask) # should have have tried - if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet - # can't say there's add-data attempts, since they may all fail - self._state = self.STATE_TABLE_ONLY - else: - self._state = self.STATE_HAS_DATA - else: # no success in dropping db tasks, no success in create fixed table, not acceptable - raise RuntimeError("Unexpected no-success scenario") - - elif ( self._state == self.STATE_TABLE_ONLY ): - if ( self.hasSuccess(tasks, DropFixedTableTask) ): - self.assertAtMostOneSuccess(tasks, DropFixedTableTask) - self._state = self.STATE_DB_ONLY - elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table - self.assertNoTask(tasks, DropFixedTableTask) - self._state = self.STATE_HAS_DATA - else: # did not drop table, did not insert data, that is impossible - raise RuntimeError("Unexpected no-success scenarios") - - elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust - if ( self.hasSuccess(tasks, DropFixedTableTask) ): - self.assertAtMostOneSuccess(tasks, DropFixedTableTask) - self._state = self.STATE_DB_ONLY - elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table - self.assertNoTask(tasks, DropFixedTableTask) - self._state = self.STATE_HAS_DATA - else: # did not drop table, did not insert data, that is impossible - raise RuntimeError("Unexpected no-success scenarios") - else: - raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - logger.debug("New DB state is: {}".format(self._state)) + self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps - def assertAtMostOneSuccess(self, tasks, cls): - sCnt = 0 - for task in tasks : - if not isinstance(task, cls): - continue - if task.isSuccess(): - task.logDebug("Task success found") - sCnt += 1 - if ( sCnt >= 2 ): - raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + # Generic Checks, first based on the start state + if self._state.canCreateDb(): + self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops - def assertIfExistThenSuccess(self, tasks, cls): - sCnt = 0 - exists = False - for task in tasks : - if not isinstance(task, cls): - continue - exists = True # we have a valid instance - if task.isSuccess(): - sCnt += 1 - if ( exists and sCnt <= 0 ): - raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + if self._state.canDropDb(): + self._state.assertIfExistThenSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop - def assertNoTask(self, tasks, cls): - for task in tasks : - if isinstance(task, cls): - raise RuntimeError("Unexpected task: {}".format(cls)) + # if self._state.canCreateFixedTable(): + # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create - def assertNoSuccess(self, tasks, cls): - for task in tasks : - if isinstance(task, cls): - if task.isSuccess(): - raise RuntimeError("Unexpected successful task: {}".format(cls)) + # if self._state.canDropFixedTable(): + # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped + # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop - def hasSuccess(self, tasks, cls): - for task in tasks : - if not isinstance(task, cls): - continue - if task.isSuccess(): - return True - return False + # if self._state.canAddData(): + # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually + + # if self._state.canReadData(): + # Nothing for sure + + newState = self._findCurrentState() + logger.debug("[STT] New DB state determined: {}".format(newState)) + self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? + self._state = newState class TaskExecutor(): def __init__(self, curStep): @@ -614,10 +895,11 @@ class Task(): @classmethod def allocTaskNum(cls): - cls.taskSn += 1 - return cls.taskSn + Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy + # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) + return Task.taskSn - def __init__(self, dbState: DbState): + def __init__(self, dbState: DbState, execStats: ExecutionStats): self._dbState = dbState self._workerThread = None self._err = None @@ -626,19 +908,22 @@ class Task(): # Assign an incremental task serial number self._taskNum = self.allocTaskNum() + # logger.debug("Creating new task {}...".format(self._taskNum)) + + self._execStats = execStats def isSuccess(self): return self._err == None - def clone(self): - newTask = self.__class__(self._dbState) + def clone(self): # TODO: why do we need this again? + newTask = self.__class__(self._dbState, self._execStats) return newTask def logDebug(self, msg): - self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg)) def logInfo(self, msg): - self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg)) def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) @@ -652,29 +937,228 @@ class Task(): self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self._err = None + self._execStats.beginTaskType(self.__class__.__name__) # mark beginning try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - self.logDebug("[=]Taos Execution exception: {0}".format(err)) + self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err)) self._err = err except: - self.logDebug("[=]Unexpected exception") + self.logDebug("[=] Unexpected exception") raise + self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) - self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): return self._dbState.execute(sql) -class CreateDbTask(Task): + +class ExecutionStats: + def __init__(self): + self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task + self._tasksInProgress = 0 + self._lock = threading.Lock() + self._firstTaskStartTime = None + self._execStartTime = None + self._elapsedTime = 0.0 # total elapsed time + self._accRunTime = 0.0 # accumulated run time + + self._failed = False + self._failureReason = None + + def startExec(self): + self._execStartTime = time.time() + + def endExec(self): + self._elapsedTime = time.time() - self._execStartTime + + def incExecCount(self, klassName, isSuccess): # TODO: add a lock here + if klassName not in self._execTimes: + self._execTimes[klassName] = [0, 0] + t = self._execTimes[klassName] # tuple for the data + t[0] += 1 # index 0 has the "total" execution times + if isSuccess: + t[1] += 1 # index 1 has the "success" execution times + + def beginTaskType(self, klassName): + with self._lock: + if self._tasksInProgress == 0 : # starting a new round + self._firstTaskStartTime = time.time() # I am now the first task + self._tasksInProgress += 1 + + def endTaskType(self, klassName, isSuccess): + with self._lock: + self._tasksInProgress -= 1 + if self._tasksInProgress == 0 : # all tasks have stopped + self._accRunTime += (time.time() - self._firstTaskStartTime) + self._firstTaskStartTime = None + + def registerFailure(self, reason): + self._failed = True + self._failureReason = reason + + def logStats(self): + logger.info("----------------------------------------------------------------------") + logger.info("| Crash_Gen test {}, with the following stats:". + format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) + logger.info("| Task Execution Times (success/total):") + execTimesAny = 0 + for k, n in self._execTimes.items(): + execTimesAny += n[0] + logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0])) + + logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny)) + logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress)) + logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) + logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) + logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) + logger.info("----------------------------------------------------------------------") + + + +class StateTransitionTask(Task): + # @classmethod + # def getAllTaskClasses(cls): # static + # return cls.__subclasses__() + @classmethod + def getInfo(cls): # each sub class should supply their own information + raise RuntimeError("Overriding method expected") + + # @classmethod + # def getBeginStates(cls): + # return cls.getInfo()[0] + + @classmethod + def getEndState(cls): # returning the class name + return cls.getInfo()[0] + + @classmethod + def canBeginFrom(cls, state: AnyState): + # return state.getValue() in cls.getBeginStates() + raise RuntimeError("must be overriden") + + def execute(self, wt: WorkerThread): + super().execute(wt) + + + +class CreateDbTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_EMPTY], # can begin from + StateDbOnly() # end state + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateDb() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("create database db") + wt.execSql("create database db") + +class DropDbTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + StateEmpty() + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropDb() -class DropDbTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("drop database db") + logger.debug("[OPS] database dropped at {}".format(time.time())) + +class CreateFixedSuperTableTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_DB_ONLY], + StateSuperTableOnly() + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateFixedSuperTable() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedSuperTableName() + wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(20), f float) ".format(tblName)) + # No need to create the regular tables, INSERT will do that automatically + + +class ReadFixedDataTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + None # meaning doesn't affect state + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canReadData() -class CreateTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + sTbName = self._dbState.getFixedSuperTableName() + dbc = wt.getDbConn() + dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + rTables = dbc.getQueryResult() + # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + for rTbName in rTables : # regular tables + dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + + # tdSql.query(" cars where tbname in ('carzero', 'carone')") + +class DropFixedSuperTableTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + StateDbOnly() # meaning doesn't affect state + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropFixedSuperTable() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedSuperTableName() + wt.execSql("drop table db.{}".format(tblName)) + +class AddFixedDataTask(StateTransitionTask): + @classmethod + def getInfo(cls): + return [ + # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], + StateHasData() + ] + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canAddData() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + ds = self._dbState + wt.execSql("use db") # TODO: seems to be an INSERT bug to require this + for i in range(10): # 0 to 9 + sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( + i, + ds.getFixedSuperTableName(), + ds.getNextBinary(), ds.getNextFloat(), + ds.getNextTick(), ds.getNextInt()) + wt.execSql(sql) + + +#---------- Non State-Transition Related Tasks ----------# + +class CreateTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tIndex = self._dbState.addTable() self.logDebug("Creating a table {} ...".format(tIndex)) @@ -682,17 +1166,6 @@ class CreateTableTask(Task): self.logDebug("Table {} created.".format(tIndex)) self._dbState.releaseTable(tIndex) -class CreateFixedTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) - -class ReadFixedDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later - # tdSql.query(" cars where tbname in ('carzero', 'carone')") - class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tableName = self._dbState.getTableNameToDelete() @@ -702,10 +1175,7 @@ class DropTableTask(Task): self.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) -class DropFixedTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedTableName() - wt.execSql("drop table db.{}".format(tblName)) + class AddDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): @@ -716,16 +1186,11 @@ class AddDataTask(Task): self.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("Executing SQL: {}".format(sql)) + self.logDebug("[SQL] Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - self.logDebug("Finished adding data") + self.logDebug("[OPS] Finished adding data") -class AddFixedDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) - wt.execSql(sql) # Deterministic random number generator class Dice(): @@ -760,28 +1225,45 @@ class Dice(): # Anyone needing to carry out work should simply come here -class WorkDispatcher(): - def __init__(self, dbState): - # self.totalNumMethods = 2 - self.tasks = [ - CreateTableTask(dbState), - DropTableTask(dbState), - AddDataTask(dbState), - ] +# class WorkDispatcher(): +# def __init__(self, dbState): +# # self.totalNumMethods = 2 +# self.tasks = [ +# # CreateTableTask(dbState), # Obsolete +# # DropTableTask(dbState), +# # AddDataTask(dbState), +# ] + +# def throwDice(self): +# max = len(self.tasks) - 1 +# dRes = random.randint(0, max) +# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) +# return dRes + +# def pickTask(self): +# dice = self.throwDice() +# return self.tasks[dice] + +# def doWork(self, workerThread): +# task = self.pickTask() +# task.execute(workerThread) + +class LoggingFilter(logging.Filter): + def filter(self, record: logging.LogRecord): + if ( record.levelno >= logging.INFO ) : + return True # info or above always log + + msg = record.msg + # print("type = {}, value={}".format(type(msg), msg)) + # sys.exit() + + # Commenting out below to adjust... + + # if msg.startswith("[TRD]"): + # return False + return True - def throwDice(self): - max = len(self.tasks) - 1 - dRes = random.randint(0, max) - # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) - return dRes - - def pickTask(self): - dice = self.throwDice() - return self.tasks[dice] - - def doWork(self, workerThread): - task = self.pickTask() - task.execute(workerThread) + def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -810,9 +1292,12 @@ def main(): sys.exit() global logger - logger = logging.getLogger('myApp') + logger = logging.getLogger('CrashGen') + logger.addFilter(LoggingFilter()) if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO + else: + logger.setLevel(logging.INFO) ch = logging.StreamHandler() logger.addHandler(ch) @@ -820,12 +1305,21 @@ def main(): Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), - WorkDispatcher(dbState), + # WorkDispatcher(dbState), # Obsolete? dbState ) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + tc.run() - dbState.cleanUp() - logger.info("Finished running thread pool") + tc.logStats() + dbState.cleanUp() + + # logger.info("Crash_Gen execution finished") if __name__ == "__main__": main()