未验证 提交 02b51b0a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2080 from taosdata/feature/crash_gen

Feature/crash gen
......@@ -579,9 +579,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if ((pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH) &&
(pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
(pRes->code == TSDB_CODE_SUCCESS && pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) {
pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS &&
((pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows,
......
......@@ -1850,8 +1850,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
// todo handle error
STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup
assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
} else { // transfer the ownership of pTableMeta to the newly create sql object.
......
......@@ -220,6 +220,7 @@ typedef struct SAcctObj {
typedef struct {
int8_t type;
int32_t index;
char db[TSDB_DB_NAME_LEN + 1];
void * pIter;
int16_t numOfColumns;
......@@ -228,7 +229,6 @@ typedef struct {
int32_t numOfReads;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS];
void * signature;
uint16_t payloadLen;
char payload[];
} SShowObj;
......
......@@ -27,7 +27,8 @@ void mnodeCleanupVgroups();
SVgObj *mnodeGetVgroup(int32_t vgId);
void mnodeIncVgroupRef(SVgObj *pVgroup);
void mnodeDecVgroupRef(SVgObj *pVgroup);
void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg);
void mnodeDropAllDbVgroups(SDbObj *pDropDb);
void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb);
void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode);
void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb);
......
......@@ -81,10 +81,10 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
mnodeDropDbFromAcct(pAcct, pDb);
mnodeDropAllChildTables(pDb);
mnodeDropAllSuperTables(pDb);
mnodeDropAllDbVgroups(pDb, false);
mnodeDropAllDbVgroups(pDb);
mnodeDropDbFromAcct(pAcct, pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS;
......@@ -998,19 +998,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
return code;
}
#if 1
mnodeDropAllDbVgroups(pMsg->pDb, true);
#else
SVgObj *pVgroup = pMsg->pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgId:%d, will be dropped", pVgroup->vgId);
SMnodeMsg *newMsg = mnodeCloneMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mnodeDropVgroup(pVgroup, newMsg);
return;
}
#endif
mnodeSendDropAllDbVgroupsMsg(pMsg->pDb);
mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name);
return mnodeDropDb(pMsg);
......
......@@ -47,13 +47,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
static void mnodeFreeShowObj(void *data);
static bool mnodeCheckShowObj(SShowObj *pShow);
static bool mnodeAccquireShowObj(SShowObj *pShow);
static bool mnodeCheckShowFinished(SShowObj *pShow);
static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size);
static void mnodeCleanupShowObj(void *pShow, bool forceRemove);
static void *mnodePutShowObj(SShowObj *pShow, int32_t size);
static void mnodeReleaseShowObj(void *pShow, bool forceRemove);
extern void *tsMnodeTmr;
static void *tsQhandleCache = NULL;
static void *tsMnodeShowCache = NULL;
static int32_t tsShowObjIndex = 0;
static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
......@@ -64,14 +65,15 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsQhandleCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj);
tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj);
return 0;
}
void mnodeCleanUpShow() {
if (tsQhandleCache != NULL) {
taosCacheCleanup(tsQhandleCache);
tsQhandleCache = NULL;
if (tsMnodeShowCache != NULL) {
mPrint("show cache is cleanup");
taosCacheCleanup(tsMnodeShowCache);
tsMnodeShowCache = NULL;
}
}
......@@ -118,13 +120,12 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen);
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
pShow->signature = pShow;
pShow->type = pShowMsg->type;
pShow->payloadLen = htons(pShowMsg->payloadLen);
strcpy(pShow->db, pShowMsg->db);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShow = mnodeSaveShowObj(pShow, showObjSize);
pShow = mnodePutShowObj(pShow, showObjSize);
if (pShow == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -132,21 +133,22 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SCMShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) {
mnodeFreeShowObj(pShow);
mnodeReleaseShowObj(pShow, true);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pShowRsp->qhandle = htobe64((uint64_t) pShow);
mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type));
mTrace("%p, show type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type));
int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle);
if (code == 0) {
pMsg->rpcRsp.rsp = pShowRsp;
pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
mnodeReleaseShowObj(pShow, false);
return TSDB_CODE_SUCCESS;
} else {
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code));
mError("%p, show type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code));
rpcFreeCont(pShowRsp);
mnodeCleanupShowObj(pShow, true);
mnodeReleaseShowObj(pShow, true);
return code;
}
}
......@@ -159,22 +161,20 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type));
mTrace("%p, show type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type));
/*
* in case of server restart, apps may hold qhandle created by server before
* restart, which is actually invalid, therefore, signature check is required.
*/
if (!mnodeCheckShowObj(pShow)) {
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow);
if (!mnodeAccquireShowObj(pShow)) {
mError("%p, show is invalid", pShow);
return TSDB_CODE_INVALID_QHANDLE;
}
if (mnodeCheckShowFinished(pShow)) {
mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows);
mTrace("%p, show is already read finished, numOfReads:%d numOfRows:%d", pShow, pShow->numOfReads, pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows;
//mnodeCleanupShowObj(pShow, true);
//return TSDB_CODE_SUCCESS;
}
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
......@@ -200,7 +200,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
if (rowsRead < 0) {
rpcFreeCont(pRsp);
mnodeCleanupShowObj(pShow, false);
mnodeReleaseShowObj(pShow, false);
assert(false);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
......@@ -211,12 +211,13 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = size;
if (rowsToRead == 0) {
mnodeCleanupShowObj(pShow, true);
if (rowsToRead == 0 || rowsRead == rowsToRead) {
pRsp->completed = 1;
mnodeReleaseShowObj(pShow, true);
} else {
mnodeCleanupShowObj(pShow, false);
mnodeReleaseShowObj(pShow, false);
}
return TSDB_CODE_SUCCESS;
}
......@@ -318,24 +319,29 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
return false;
}
static bool mnodeCheckShowObj(SShowObj *pShow) {
SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow);
static bool mnodeAccquireShowObj(SShowObj *pShow) {
char key[10];
sprintf(key, "%d", pShow->index);
SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key);
if (pSaved == pShow) {
mTrace("%p, show is accquired from cache", pShow);
return true;
} else {
mTrace("show:%p, is already released", pShow);
return false;
}
}
static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) {
if (tsQhandleCache != NULL) {
char key[24];
sprintf(key, "show:%p", pShow);
SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60);
static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
if (tsMnodeShowCache != NULL) {
char key[10];
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
sprintf(key, "%d", pShow->index);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 60);
free(pShow);
mTrace("show:%p, is saved", newQhandle);
mTrace("%p, show is put into cache", newQhandle);
return newQhandle;
}
......@@ -345,10 +351,10 @@ static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) {
static void mnodeFreeShowObj(void *data) {
SShowObj *pShow = data;
sdbFreeIter(pShow->pIter);
mTrace("show:%p, is destroyed", pShow);
mTrace("%p, show is destroyed", pShow);
}
static void mnodeCleanupShowObj(void *pShow, bool forceRemove) {
mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsQhandleCache, &pShow, forceRemove);
static void mnodeReleaseShowObj(void *pShow, bool forceRemove) {
mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove);
}
......@@ -148,37 +148,30 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
mnodeDecVgroupRef(pVgroup);
SDbObj *pDb = mnodeGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("ctable:%s, vgId:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB;
}
mnodeDecDbRef(pDb);
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
if (pAcct == NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct);
return TSDB_CODE_INVALID_ACCT;
}
mnodeDecAcctRef(pAcct);
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
SAcctObj *pAcct = NULL;
pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup != NULL) pDb = mnodeGetDb(pVgroup->dbName);
if (pDb != NULL) pAcct = mnodeGetAcct(pDb->acct);
if (pTable->info.type == TSDB_CHILD_TABLE) {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
mnodeRemoveTableFromStable(pTable->superTable, pTable);
mnodeDecTableRef(pTable->superTable);
} else {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
mnodeRemoveTableFromDb(pDb);
mnodeRemoveTableFromVgroup(pVgroup, pTable);
if (pDb != NULL) mnodeRemoveTableFromDb(pDb);
if (pVgroup != NULL) mnodeRemoveTableFromVgroup(pVgroup, pTable);
mnodeDecVgroupRef(pVgroup);
mnodeDecDbRef(pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS;
}
......@@ -693,10 +686,10 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
}
if (pCreate->numOfTags != 0) {
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateSuperTableMsg(pMsg);
} else {
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateChildTableMsg(pMsg);
}
}
......@@ -1276,9 +1269,9 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable->vgHash != NULL) {
if (pTable != NULL && pTable->vgHash != NULL) {
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo));
}
}
mnodeDecTableRef(pTable);
}
......@@ -1287,12 +1280,23 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pRsp->numOfTables = htonl(numOfTable);
char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg);
pRsp->numOfTables = 0;
char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) {
mError("stable:%s, not exist while get stable vgroup info", stableName);
mnodeDecTableRef(pTable);
continue;
}
if (pTable->vgHash == NULL) {
mError("stable:%s, not vgroup exist while get stable vgroup info", stableName);
mnodeDecTableRef(pTable);
continue;
}
SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg;
SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash);
......@@ -1318,17 +1322,25 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
}
taosHashDestroyIter(pIter);
mnodeDecTableRef(pTable);
pVgroupInfo->numOfVgroups = htonl(vgSize);
// one table is done, try the next table
msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo);
pRsp->numOfTables++;
}
pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = msg - (char *)pRsp;
if (pRsp->numOfTables != numOfTable) {
rpcFreeCont(pRsp);
return TSDB_CODE_INVALID_TABLE_ID;
} else {
pRsp->numOfTables = htonl(pRsp->numOfTables);
pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = msg - (char *)pRsp;
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
}
static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
......@@ -1429,7 +1441,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
SSuperTableObj *pSuperTable = mnodeGetSuperTable(pTagData->name);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name);
free(pTable);
mnodeDestroyChildTable(pTable);
terrno = TSDB_CODE_INVALID_TABLE_ID;
return NULL;
}
......@@ -1738,7 +1750,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STagData* pTag = (STagData*)pInfo->tags;
STagData *pTag = (STagData *)pInfo->tags;
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen);
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
......@@ -1754,14 +1766,13 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
pCreateMsg->contLen = htonl(contLen);
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, ((STagData *)(pCreateMsg->schema))->name);
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
pMsg->rpcMsg.pCont = pCreateMsg;
pMsg->rpcMsg.contLen = contLen;
mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, pInfo->tags);
return TSDB_CODE_ACTION_NEED_REPROCESSED;
}
......
......@@ -784,7 +784,7 @@ void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb) {
mPrint("db:%s, all vgroups is updated in sdb", pAlterDb->name);
}
void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
void * pIter = NULL;
int32_t numOfVgroups = 0;
SVgObj *pVgroup = NULL;
......@@ -802,10 +802,6 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
};
sdbDeleteRow(&oper);
numOfVgroups++;
if (sendMsg) {
mnodeSendDropVgroupMsg(pVgroup, NULL);
}
}
mnodeDecVgroupRef(pVgroup);
......@@ -815,3 +811,25 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups);
}
void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) {
void * pIter = NULL;
int32_t numOfVgroups = 0;
SVgObj *pVgroup = NULL;
mPrint("db:%s, all vgroups will be dropped in dnode", pDropDb->name);
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDropDb) {
mnodeSendDropVgroupMsg(pVgroup, NULL);
}
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
mPrint("db:%s, all vgroups:%d drop msg is sent to dnode", pDropDb->name, numOfVgroups);
}
\ No newline at end of file
......@@ -14,6 +14,7 @@
from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
import sys
import traceback
# Require Python 3
if sys.version_info[0] < 3:
raise Exception("Must be using Python 3")
......@@ -24,11 +25,13 @@ import copy
import threading
import random
import time
import logging
import datetime
import textwrap
from typing import List
from typing import Dict
from util.log import *
from util.dnodes import *
......@@ -45,6 +48,14 @@ logger = None
def runThread(wt: WorkerThread):
wt.run()
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return self.msg
class WorkerThread:
def __init__(self, pool: ThreadPool, tid,
tc: ThreadCoordinator,
......@@ -63,10 +74,10 @@ class WorkerThread:
self._dbConn = DbConn()
def logDebug(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg))
logger.debug(" TRD[{}] {}".format(self._tid, msg))
def logInfo(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg))
logger.info(" TRD[{}] {}".format(self._tid, msg))
def getTaskExecutor(self):
......@@ -95,15 +106,19 @@ class WorkerThread:
while True:
tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
logger.debug("Thread task loop exited barrier...")
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("Thread task loop exited step gate...")
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
if not self._tc.isRunning():
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask()
logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
task.execute(self)
tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ):
......@@ -123,7 +138,7 @@ class WorkerThread:
self.verifyThreadSelf() # only allowed by ourselves
# Wait again at the "gate", waiting to be "tapped"
# logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
self._stepGate.wait()
self._stepGate.clear()
......@@ -133,33 +148,40 @@ class WorkerThread:
self.verifyThreadAlive()
self.verifyThreadMain() # only allowed for main thread
logger.debug("Tapping worker thread {}".format(self._tid))
logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit
def execSql(self, sql): # not "execute", since we are out side the DB context
def execSql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ):
return self._dbConn.execute(sql)
else:
return self._tc.getDbState().getDbConn().execute(sql)
def querySql(self, sql): # not "execute", since we are out side the DB context
def getDbConn(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn.query(sql)
return self._dbConn
else:
return self._tc.getDbState().getDbConn().query(sql)
return self._tc.getDbState().getDbConn()
# def querySql(self, sql): # not "execute", since we are out side the DB context
# if ( gConfig.per_thread_db_connection ):
# return self._dbConn.query(sql)
# else:
# return self._tc.getDbState().getDbConn().query(sql)
class ThreadCoordinator:
def __init__(self, pool, wd: WorkDispatcher, dbState):
def __init__(self, pool, dbState):
self._curStep = -1 # first step is 0
self._pool = pool
self._wd = wd
# self._wd = wd
self._te = None # prepare for every new step
self._dbState = dbState
self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats()
def getTaskExecutor(self):
return self._te
......@@ -176,41 +198,63 @@ class ThreadCoordinator:
# Coordinate all threads step by step
self._curStep = -1 # not started yet
maxSteps = gConfig.max_steps # type: ignore
while(self._curStep < maxSteps):
print(".", end="", flush=True)
logger.debug("Main thread going to sleep")
self._execStats.startExec() # start the stop watch
failed = False
while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9
if not gConfig.debug:
print(".", end="", flush=True) # print this only if we are not in debug mode
logger.debug("[TRD] Main thread going to sleep")
# Now ready to enter a step
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
try:
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection
logger.info("DB connection broken, execution failed")
traceback.print_stack()
failed = True
self._te = None # Not running any more
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
else:
raise
finally:
pass
self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step
logger.info("<-- Step {} finished".format(self._curStep))
logger.debug("<-- Step {} finished".format(self._curStep))
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
# A new TE for the new step
self._te = TaskExecutor(self._curStep)
if not failed: # only if not failed
self._te = TaskExecutor(self._curStep)
logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
self.tapAllThreads()
logger.debug("Main thread ready to finish up...")
self.crossStepBarrier() # Cross it one last time, after all threads finish
self._stepBarrier.reset()
logger.debug("Main thread in exclusive zone...")
self._te = None # No more executor, time to end
logger.debug("Main thread tapping all threads one last time...")
self.tapAllThreads() # Let the threads run one last time
if not failed: # only in regular situations
self.crossStepBarrier() # Cross it one last time, after all threads finish
self._stepBarrier.reset()
logger.debug("Main thread in exclusive zone...")
self._te = None # No more executor, time to end
logger.debug("Main thread tapping all threads one last time...")
self.tapAllThreads() # Let the threads run one last time
logger.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish
logger.info("All worker thread finished")
self._execStats.endExec()
logger.info("All threads finished")
print("\r\nFinished")
def logStats(self):
self._execStats.logStats()
def tapAllThreads(self): # in a deterministic manner
wakeSeq = []
......@@ -219,7 +263,7 @@ class ThreadCoordinator:
wakeSeq.append(i)
else:
wakeSeq.insert(0, i)
logger.info("Waking up threads: {}".format(str(wakeSeq)))
logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq)))
# TODO: set dice seed to a deterministic value
for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
......@@ -233,11 +277,15 @@ class ThreadCoordinator:
raise RuntimeError("Cannot fetch task when not running")
# return self._wd.pickTask()
# Alternatively, let's ask the DbState for the appropriate task
dbState = self.getDbState()
tasks = dbState.getTasksAtState()
i = Dice.throw(len(tasks))
# return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
return tasks[i].clone()
# dbState = self.getDbState()
# tasks = dbState.getTasksAtState() # TODO: create every time?
# nTasks = len(tasks)
# i = Dice.throw(nTasks)
# logger.debug(" (dice:{}/{}) ".format(i, nTasks))
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
# return tasks[i].clone() # TODO: still necessary?
taskType = self.getDbState().pickTaskType() # pick a task type for current state
return taskType(self.getDbState(), self._execStats) # create a task from it
def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread
......@@ -253,7 +301,7 @@ class ThreadPool:
self.maxSteps = maxSteps
self.funcSequencer = funcSequencer
# Internal class variables
self.dispatcher = WorkDispatcher(dbState)
# self.dispatcher = WorkDispatcher(dbState) # Obsolete?
self.curStep = 0
self.threadList = []
# self.stepGate = threading.Condition() # Gate to hold/sync all threads
......@@ -362,7 +410,7 @@ class DbConn:
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db')
# self._cursor.execute('use db') # note we do this in _findCurrenState
# Open connection
self._tdSql = TDSql()
......@@ -390,28 +438,265 @@ class DbConn:
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open")
return self._tdSql.execute(sql)
logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.execute(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql) -> int : # return number of rows retrieved
def query(self, sql) : # return rows affected
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
return self._tdSql.query(sql)
logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.query(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
def getQueryResult(self):
return self._tdSql.queryResult
# State of the database as we believe it to be
class DbState():
def _queryAny(self, sql) : # actual query result as an int
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
tSql = self._tdSql
nRows = tSql.query(sql)
if nRows != 1 :
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if tSql.queryRows != 1 or tSql.queryCols != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return tSql.queryResult[0][0]
def queryScalar(self, sql) -> int :
return self._queryAny(sql)
def queryString(self, sql) -> str :
return self._queryAny(sql)
class AnyState:
STATE_INVALID = -1
STATE_EMPTY = 1 # nothing there, no even a DB
STATE_DB_ONLY = 2 # we have a DB, but nothing else
STATE_TABLE_ONLY = 3 # we have a table, but totally empty
STATE_HAS_DATA = 4 # we have some data in the table
STATE_EMPTY = 0 # nothing there, no even a DB
STATE_DB_ONLY = 1 # we have a DB, but nothing else
STATE_TABLE_ONLY = 2 # we have a table, but totally empty
STATE_HAS_DATA = 3 # we have some data in the table
_stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]
STATE_VAL_IDX = 0
CAN_CREATE_DB = 1
CAN_DROP_DB = 2
CAN_CREATE_FIXED_SUPER_TABLE = 3
CAN_DROP_FIXED_SUPER_TABLE = 4
CAN_ADD_DATA = 5
CAN_READ_DATA = 6
def __init__(self):
self._info = self.getInfo()
def __str__(self):
return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
def getInfo(self):
raise RuntimeError("Must be overriden by child classes")
def equals(self, other):
if isinstance(other, int):
return self.getValIndex() == other
elif isinstance(other, AnyState):
return self.getValIndex() == other.getValIndex()
else:
raise RuntimeError("Unexpected comparison, type = {}".format(type(other)))
def verifyTasksToState(self, tasks, newState):
raise RuntimeError("Must be overriden by child classes")
def getValIndex(self):
return self._info[self.STATE_VAL_IDX]
def getValue(self):
return self._info[self.STATE_VAL_IDX]
def canCreateDb(self):
return self._info[self.CAN_CREATE_DB]
def canDropDb(self):
return self._info[self.CAN_DROP_DB]
def canCreateFixedSuperTable(self):
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self):
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
def canAddData(self):
return self._info[self.CAN_ADD_DATA]
def canReadData(self):
return self._info[self.CAN_READ_DATA]
def assertAtMostOneSuccess(self, tasks, cls):
sCnt = 0
for task in tasks :
if not isinstance(task, cls):
continue
if task.isSuccess():
# task.logDebug("Task success found")
sCnt += 1
if ( sCnt >= 2 ):
raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))
def assertIfExistThenSuccess(self, tasks, cls):
sCnt = 0
exists = False
for task in tasks :
if not isinstance(task, cls):
continue
exists = True # we have a valid instance
if task.isSuccess():
sCnt += 1
if ( exists and sCnt <= 0 ):
raise RuntimeError("Unexpected zero success for task: {}".format(cls))
def assertNoTask(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
def assertNoSuccess(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
if task.isSuccess():
raise RuntimeError("Unexpected successful task: {}".format(cls))
def hasSuccess(self, tasks, cls):
for task in tasks :
if not isinstance(task, cls):
continue
if task.isSuccess():
return True
return False
def hasTask(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
return True
return False
class StateInvalid(AnyState):
def getInfo(self):
return [
self.STATE_INVALID,
False, False, # can create/drop Db
False, False, # can create/drop fixed table
False, False, # can insert/read data with fixed table
]
# def verifyTasksToState(self, tasks, newState):
class StateEmpty(AnyState):
def getInfo(self):
return [
self.STATE_EMPTY,
True, False, # can create/drop Db
False, False, # can create/drop fixed table
False, False, # can insert/read data with fixed table
]
def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB
if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks
self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers
class StateDbOnly(AnyState):
def getInfo(self):
return [
self.STATE_DB_ONLY,
False, True,
True, False,
False, False,
]
def verifyTasksToState(self, tasks, newState):
self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases
self.assertIfExistThenSuccess(tasks, DropDbTask)
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
# Nothing to be said about adding data task
if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY
elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful
self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail
# self._state = self.STATE_TABLE_ONLY
# else:
# self._state = self.STATE_HAS_DATA
# What about AddFixedData?
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
# self._state = self.STATE_HAS_DATA
# else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
# # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
# self._state = self.STATE_DB_ONLY # no change
class StateSuperTableOnly(AnyState):
def getInfo(self):
return [
self.STATE_TABLE_ONLY,
False, True,
False, True,
True, True,
]
def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table
self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask)
# self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
# self._state = self.STATE_HAS_DATA
# elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
# self.assertNoTask(tasks, DropFixedTableTask)
# self.assertNoTask(tasks, AddFixedDataTask)
# self._state = self.STATE_TABLE_ONLY # no change
# else: # did not drop table, did not insert data, did not read successfully, that is impossible
# raise RuntimeError("Unexpected no-success scenarios")
# TODO: need to revamp!!
class StateHasData(AnyState):
def getInfo(self):
return [
self.STATE_HAS_DATA,
False, True,
False, True,
True, True,
]
def verifyTasksToState(self, tasks, newState):
if ( newState.equals(AnyState.STATE_EMPTY) ):
self.hasSuccess(tasks, DropDbTask)
self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task
self.assertNoTask(tasks, DropDbTask) # we must have drop_db task
self.hasSuccess(tasks, DropFixedSuperTableTask)
self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
self.assertNoTask(tasks, DropDbTask)
self.assertNoTask(tasks, DropFixedSuperTableTask)
self.assertNoTask(tasks, AddFixedDataTask)
# self.hasSuccess(tasks, DeleteDataTasks)
else:
self.assertNoTask(tasks, DropDbTask)
self.assertNoTask(tasks, DropFixedSuperTableTask)
self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
# State of the database as we believe it to be
class DbState():
def __init__(self):
self.tableNumQueue = LinearQueue()
self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock()
self._state = self.STATE_INVALID
self._state = StateInvalid() # starting state
self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
# self.openDbServerConnection()
self._dbConn = DbConn()
......@@ -425,10 +710,10 @@ class DbState():
else:
raise
except:
print("[=]Unexpected exception")
print("[=] Unexpected exception")
raise
self._dbConn.resetDb() # drop and recreate DB
self._state = self.STATE_EMPTY # initial state, the result of above
self._state = StateEmpty() # initial state, the result of above
def getDbConn(self):
return self._dbConn
......@@ -441,8 +726,8 @@ class DbState():
tIndex = self.tableNumQueue.push()
return tIndex
def getFixedTableName(self):
return "fixed_table"
def getFixedSuperTableName(self):
return "fs_table"
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
......@@ -456,6 +741,12 @@ class DbState():
with self._lock:
self._lastInt += 1
return self._lastInt
def getNextBinary(self):
return "Los_Angeles_{}".format(self.getNextInt())
def getNextFloat(self):
return 0.9 + self.getNextInt()
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
......@@ -464,134 +755,124 @@ class DbState():
return "table_{}".format(tblNum)
def execSql(self, sql): # using the main DB connection
return self._dbConn.execute(sql)
def cleanUp(self):
self._dbConn.close()
def getTasksAtState(self):
tasks = []
tasks.append(ReadFixedDataTask(self)) # always
if ( self._state == self.STATE_EMPTY ):
tasks.append(CreateDbTask(self))
tasks.append(CreateFixedTableTask(self))
elif ( self._state == self.STATE_DB_ONLY ):
tasks.append(DropDbTask(self))
tasks.append(CreateFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
elif ( self._state == self.STATE_TABLE_ONLY ):
tasks.append(DropFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
tasks.append(DropFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
# May be slow, use cautionsly...
def getTaskTypesAtState(self):
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
firstTaskTypes = []
for tc in allTaskClasses:
# t = tc(self) # create task object
if tc.canBeginFrom(self._state):
firstTaskTypes.append(tc)
# now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
taskTypes = firstTaskTypes.copy() # have to have these
for task1 in firstTaskTypes: # each task type gathered so far
endState = task1.getEndState() # figure the end state
if endState == None:
continue
for tc in allTaskClasses: # what task can further begin from there?
if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
taskTypes.append(tc) # gather it
if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._state))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes))
return taskTypes
# tasks.append(ReadFixedDataTask(self)) # always for everybody
# if ( self._state == self.STATE_EMPTY ):
# tasks.append(CreateDbTask(self))
# tasks.append(CreateFixedTableTask(self))
# elif ( self._state == self.STATE_DB_ONLY ):
# tasks.append(DropDbTask(self))
# tasks.append(CreateFixedTableTask(self))
# tasks.append(AddFixedDataTask(self))
# elif ( self._state == self.STATE_TABLE_ONLY ):
# tasks.append(DropFixedTableTask(self))
# tasks.append(AddFixedDataTask(self))
# elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
# tasks.append(DropFixedTableTask(self))
# tasks.append(AddFixedDataTask(self))
# else:
# raise RuntimeError("Unexpected DbState state: {}".format(self._state))
# return tasks
def pickTaskType(self):
taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
weights = []
for tt in taskTypes:
endState = tt.getEndState()
if endState != None :
weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
else:
weights.append(10) # read data task, default to 10: TODO: change to a constant
i = self._weighted_choice_sub(weights)
# logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return taskTypes[i]
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
for i, w in enumerate(weights):
rnd -= w
if rnd < 0:
return i
def _findCurrentState(self):
dbc = self._dbConn
ts = time.time()
if dbc.query("show databases") == 0 : # no database?!
# logger.debug("Found EMPTY state")
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty()
dbc.execute("use db") # did not do this when openning connection
if dbc.query("show tables") == 0 : # no tables
# logger.debug("Found DB ONLY state")
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly()
if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data
# logger.debug("Found TABLE_ONLY state")
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly()
else:
raise RuntimeError("Unexpected DbState state: {}".format(self._state))
return tasks
# logger.debug("Found HAS_DATA state")
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
return StateHasData()
def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
return # do nothing
if ( self._state == self.STATE_EMPTY ):
# self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table
if ( self.hasSuccess(tasks, CreateDbTask) ):
self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
self._state = self.STATE_DB_ONLY
if ( self.hasSuccess(tasks, CreateFixedTableTask )):
self._state = self.STATE_TABLE_ONLY
# else: # no successful table creation, not much we can say, as it is step 2
else: # did not create db
self.assertNoTask(tasks, CreateDbTask) # because we did not have such task
# self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task
self.assertNoSuccess(tasks, CreateFixedTableTask)
elif ( self._state == self.STATE_DB_ONLY ):
self.assertAtMostOneSuccess(tasks, DropDbTask)
self.assertIfExistThenSuccess(tasks, DropDbTask)
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
# Nothing to be said about adding data task
if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
self.assertAtMostOneSuccess(tasks, DropDbTask)
self._state = self.STATE_EMPTY
elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
self.assertNoTask(tasks, DropDbTask) # should have have tried
if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# can't say there's add-data attempts, since they may all fail
self._state = self.STATE_TABLE_ONLY
else:
self._state = self.STATE_HAS_DATA
else: # no success in dropping db tasks, no success in create fixed table, not acceptable
raise RuntimeError("Unexpected no-success scenario")
elif ( self._state == self.STATE_TABLE_ONLY ):
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
self._state = self.STATE_DB_ONLY
elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
self.assertNoTask(tasks, DropFixedTableTask)
self._state = self.STATE_HAS_DATA
else: # did not drop table, did not insert data, that is impossible
raise RuntimeError("Unexpected no-success scenarios")
elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
self._state = self.STATE_DB_ONLY
elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
self.assertNoTask(tasks, DropFixedTableTask)
self._state = self.STATE_HAS_DATA
else: # did not drop table, did not insert data, that is impossible
raise RuntimeError("Unexpected no-success scenarios")
else:
raise RuntimeError("Unexpected DbState state: {}".format(self._state))
logger.debug("New DB state is: {}".format(self._state))
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
def assertAtMostOneSuccess(self, tasks, cls):
sCnt = 0
for task in tasks :
if not isinstance(task, cls):
continue
if task.isSuccess():
task.logDebug("Task success found")
sCnt += 1
if ( sCnt >= 2 ):
raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))
# Generic Checks, first based on the start state
if self._state.canCreateDb():
self._state.assertIfExistThenSuccess(tasks, CreateDbTask)
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
def assertIfExistThenSuccess(self, tasks, cls):
sCnt = 0
exists = False
for task in tasks :
if not isinstance(task, cls):
continue
exists = True # we have a valid instance
if task.isSuccess():
sCnt += 1
if ( exists and sCnt <= 0 ):
raise RuntimeError("Unexpected zero success for task: {}".format(cls))
if self._state.canDropDb():
self._state.assertIfExistThenSuccess(tasks, DropDbTask)
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
def assertNoTask(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
raise RuntimeError("Unexpected task: {}".format(cls))
# if self._state.canCreateFixedTable():
# self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
def assertNoSuccess(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
if task.isSuccess():
raise RuntimeError("Unexpected successful task: {}".format(cls))
# if self._state.canDropFixedTable():
# self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
# self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
def hasSuccess(self, tasks, cls):
for task in tasks :
if not isinstance(task, cls):
continue
if task.isSuccess():
return True
return False
# if self._state.canAddData():
# self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually
# if self._state.canReadData():
# Nothing for sure
newState = self._findCurrentState()
logger.debug("[STT] New DB state determined: {}".format(newState))
self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
self._state = newState
class TaskExecutor():
def __init__(self, curStep):
......@@ -614,10 +895,11 @@ class Task():
@classmethod
def allocTaskNum(cls):
cls.taskSn += 1
return cls.taskSn
Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
# logger.debug("Allocating taskSN: {}".format(Task.taskSn))
return Task.taskSn
def __init__(self, dbState: DbState):
def __init__(self, dbState: DbState, execStats: ExecutionStats):
self._dbState = dbState
self._workerThread = None
self._err = None
......@@ -626,19 +908,22 @@ class Task():
# Assign an incremental task serial number
self._taskNum = self.allocTaskNum()
# logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats
def isSuccess(self):
return self._err == None
def clone(self):
newTask = self.__class__(self._dbState)
def clone(self): # TODO: why do we need this again?
newTask = self.__class__(self._dbState, self._execStats)
return newTask
def logDebug(self, msg):
self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))
self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def logInfo(self, msg):
self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))
self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
......@@ -652,29 +937,228 @@ class Task():
self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
self._err = None
self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
try:
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
self.logDebug("[=]Taos Execution exception: {0}".format(err))
self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err))
self._err = err
except:
self.logDebug("[=]Unexpected exception")
self.logDebug("[=] Unexpected exception")
raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
def execSql(self, sql):
return self._dbState.execute(sql)
class CreateDbTask(Task):
class ExecutionStats:
def __init__(self):
self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
self._tasksInProgress = 0
self._lock = threading.Lock()
self._firstTaskStartTime = None
self._execStartTime = None
self._elapsedTime = 0.0 # total elapsed time
self._accRunTime = 0.0 # accumulated run time
self._failed = False
self._failureReason = None
def startExec(self):
self._execStartTime = time.time()
def endExec(self):
self._elapsedTime = time.time() - self._execStartTime
def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
if klassName not in self._execTimes:
self._execTimes[klassName] = [0, 0]
t = self._execTimes[klassName] # tuple for the data
t[0] += 1 # index 0 has the "total" execution times
if isSuccess:
t[1] += 1 # index 1 has the "success" execution times
def beginTaskType(self, klassName):
with self._lock:
if self._tasksInProgress == 0 : # starting a new round
self._firstTaskStartTime = time.time() # I am now the first task
self._tasksInProgress += 1
def endTaskType(self, klassName, isSuccess):
with self._lock:
self._tasksInProgress -= 1
if self._tasksInProgress == 0 : # all tasks have stopped
self._accRunTime += (time.time() - self._firstTaskStartTime)
self._firstTaskStartTime = None
def registerFailure(self, reason):
self._failed = True
self._failureReason = reason
def logStats(self):
logger.info("----------------------------------------------------------------------")
logger.info("| Crash_Gen test {}, with the following stats:".
format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
logger.info("| Task Execution Times (success/total):")
execTimesAny = 0
for k, n in self._execTimes.items():
execTimesAny += n[0]
logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0]))
logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
logger.info("----------------------------------------------------------------------")
class StateTransitionTask(Task):
# @classmethod
# def getAllTaskClasses(cls): # static
# return cls.__subclasses__()
@classmethod
def getInfo(cls): # each sub class should supply their own information
raise RuntimeError("Overriding method expected")
# @classmethod
# def getBeginStates(cls):
# return cls.getInfo()[0]
@classmethod
def getEndState(cls): # returning the class name
return cls.getInfo()[0]
@classmethod
def canBeginFrom(cls, state: AnyState):
# return state.getValue() in cls.getBeginStates()
raise RuntimeError("must be overriden")
def execute(self, wt: WorkerThread):
super().execute(wt)
class CreateDbTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
# [AnyState.STATE_EMPTY], # can begin from
StateDbOnly() # end state
]
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canCreateDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db")
wt.execSql("create database db")
class DropDbTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
# [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
StateEmpty()
]
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropDb()
class DropDbTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time()))
class CreateFixedSuperTableTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
# [AnyState.STATE_DB_ONLY],
StateSuperTableOnly()
]
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canCreateFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedSuperTableName()
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(20), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically
class ReadFixedDataTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
None # meaning doesn't affect state
]
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canReadData()
class CreateTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbState.getFixedSuperTableName()
dbc = wt.getDbConn()
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
rTables = dbc.getQueryResult()
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables : # regular tables
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropFixedSuperTableTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
StateDbOnly() # meaning doesn't affect state
]
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedSuperTableName()
wt.execSql("drop table db.{}".format(tblName))
class AddFixedDataTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
StateHasData()
]
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canAddData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
for i in range(10): # 0 to 9
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
i,
ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
#---------- Non State-Transition Related Tasks ----------#
class CreateTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tIndex = self._dbState.addTable()
self.logDebug("Creating a table {} ...".format(tIndex))
......@@ -682,17 +1166,6 @@ class CreateTableTask(Task):
self.logDebug("Table {} created.".format(tIndex))
self._dbState.releaseTable(tIndex)
class CreateFixedTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
class ReadFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tableName = self._dbState.getTableNameToDelete()
......@@ -702,10 +1175,7 @@ class DropTableTask(Task):
self.logInfo("Dropping a table db.{} ...".format(tableName))
wt.execSql("drop table db.{}".format(tableName))
class DropFixedTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
wt.execSql("drop table db.{}".format(tblName))
class AddDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
......@@ -716,16 +1186,11 @@ class AddDataTask(Task):
self.logInfo("No table found to add data, skipping...")
return
sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
self.logDebug("Executing SQL: {}".format(sql))
self.logDebug("[SQL] Executing SQL: {}".format(sql))
wt.execSql(sql)
ds.releaseTable(tIndex)
self.logDebug("Finished adding data")
self.logDebug("[OPS] Finished adding data")
class AddFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
# Deterministic random number generator
class Dice():
......@@ -760,28 +1225,45 @@ class Dice():
# Anyone needing to carry out work should simply come here
class WorkDispatcher():
def __init__(self, dbState):
# self.totalNumMethods = 2
self.tasks = [
CreateTableTask(dbState),
DropTableTask(dbState),
AddDataTask(dbState),
]
# class WorkDispatcher():
# def __init__(self, dbState):
# # self.totalNumMethods = 2
# self.tasks = [
# # CreateTableTask(dbState), # Obsolete
# # DropTableTask(dbState),
# # AddDataTask(dbState),
# ]
# def throwDice(self):
# max = len(self.tasks) - 1
# dRes = random.randint(0, max)
# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
# return dRes
# def pickTask(self):
# dice = self.throwDice()
# return self.tasks[dice]
# def doWork(self, workerThread):
# task = self.pickTask()
# task.execute(workerThread)
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if ( record.levelno >= logging.INFO ) :
return True # info or above always log
msg = record.msg
# print("type = {}, value={}".format(type(msg), msg))
# sys.exit()
# Commenting out below to adjust...
# if msg.startswith("[TRD]"):
# return False
return True
def throwDice(self):
max = len(self.tasks) - 1
dRes = random.randint(0, max)
# logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
return dRes
def pickTask(self):
dice = self.throwDice()
return self.tasks[dice]
def doWork(self, workerThread):
task = self.pickTask()
task.execute(workerThread)
def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html
......@@ -810,9 +1292,12 @@ def main():
sys.exit()
global logger
logger = logging.getLogger('myApp')
logger = logging.getLogger('CrashGen')
logger.addFilter(LoggingFilter())
if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)
......@@ -820,12 +1305,21 @@ def main():
Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator(
ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0),
WorkDispatcher(dbState),
# WorkDispatcher(dbState), # Obsolete?
dbState
)
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:
# rows = dbc.query("show databases")
# print("Rows: {}, time={}".format(rows, time.time()))
tc.run()
dbState.cleanUp()
logger.info("Finished running thread pool")
tc.logStats()
dbState.cleanUp()
# logger.info("Crash_Gen execution finished")
if __name__ == "__main__":
main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册