diff --git a/src/connector/grafana/tdengine/package.json b/src/connector/grafana/tdengine/package.json index 0eb7a76be6cfccd81f680f179c8e59499690201b..678278ec4793f8f2d4e5b07c0f7a964961928862 100644 --- a/src/connector/grafana/tdengine/package.json +++ b/src/connector/grafana/tdengine/package.json @@ -1,7 +1,7 @@ { "name": "TDengine", "private": false, - "version": "1.0.0", + "version": "2.0.0", "description": "grafana datasource plugin for tdengine", "scripts": { "build": "./node_modules/grunt-cli/bin/grunt", diff --git a/src/connector/jdbc/CMakeLists.txt b/src/connector/jdbc/CMakeLists.txt index 05c8bebafe79469466fa956c479aab80ccbc5943..7f823b97b266dae11ce1ba384cc812a1a9d6b691 100644 --- a/src/connector/jdbc/CMakeLists.txt +++ b/src/connector/jdbc/CMakeLists.txt @@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED) ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME} POST_BUILD COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml - COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-1.0.3-dist.jar ${LIBRARY_OUTPUT_PATH} + COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.0-dist.jar ${LIBRARY_OUTPUT_PATH} COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMENT "build jdbc driver") ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 5aa4f6a2e3e41542fd25b655f9959da13fe4efdb..2f0c3c78e75f557ba8c3e4590057a412684f3c96 100755 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.taosdata.jdbc taos-jdbcdriver - 1.0.3 + 2.0.0 jar JDBCDriver https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc diff --git a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java index 55ab2fdc52ab183264088381ca4f2e475c0c4c08..03a4761b9155838c2a23afedc1c98573a3313fd8 100644 --- a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java +++ b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java @@ -8,7 +8,7 @@ import java.util.Properties; public class TestAsyncTSDBSubscribe { public static void main(String[] args) throws SQLException { - String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " + + String usage = "java -cp taos-jdbcdriver-2.0.0_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " + "-tname tableName -h host"; if (args.length < 2) { System.err.println(usage); diff --git a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java index f628f3cada5ef1542fc756184d6d6ca4955cd422..598ef4bbc02cd4beb7ba18bb1794a572fae16b13 100644 --- a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java +++ b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java @@ -10,7 +10,7 @@ import java.util.Properties; public class TestTSDBSubscribe { public static void main(String[] args) throws Exception { - String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " + + String usage = "java -cp taos-jdbcdriver-2.0.0_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " + "-topic topicName -tname tableName -h host"; if (args.length < 2) { System.err.println(usage); diff --git a/src/connector/nodejs/package.json b/src/connector/nodejs/package.json index 8d7a971aa318844b8052cc17ae4becfab74cebab..2bc4a2453debac241744fe114cb6bae654e882e6 100644 --- a/src/connector/nodejs/package.json +++ b/src/connector/nodejs/package.json @@ -1,6 +1,6 @@ { "name": "td-connector", - "version": "1.6.1", + "version": "2.0.0", "description": "A Node.js connector for TDengine.", "main": "tdengine.js", "scripts": { diff --git a/src/connector/python/linux/python2/setup.py b/src/connector/python/linux/python2/setup.py index ae5ebad671bc528ffcbc48c945c3df437fde0f1e..2e4f80b8f01723d0b0921051e2b1bacc96d30f0d 100644 --- a/src/connector/python/linux/python2/setup.py +++ b/src/connector/python/linux/python2/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="taos", - version="1.4.15", + version="2.0.0", author="Taosdata Inc.", author_email="support@taosdata.com", description="TDengine python client package", diff --git a/src/connector/python/linux/python2/taos.egg-info/PKG-INFO b/src/connector/python/linux/python2/taos.egg-info/PKG-INFO index ce6d8c58b2199d5cc15fa7042e089383c30c6e20..96bf9059fd076214c159e8824c3e060bb7c683df 100644 --- a/src/connector/python/linux/python2/taos.egg-info/PKG-INFO +++ b/src/connector/python/linux/python2/taos.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: taos -Version: 1.4.15 +Version: 2.0.0 Summary: TDengine python client package Home-page: https://github.com/pypa/sampleproject Author: Taosdata Inc. diff --git a/src/connector/python/linux/python3/setup.py b/src/connector/python/linux/python3/setup.py index 0669953ca32083b7878190a481e1a1c13f1260dc..03a49fc1c5bac82f66d9c4c8dcc5f796ab8209a9 100644 --- a/src/connector/python/linux/python3/setup.py +++ b/src/connector/python/linux/python3/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="taos", - version="1.4.15", + version="2.0.0", author="Taosdata Inc.", author_email="support@taosdata.com", description="TDengine python client package", diff --git a/src/connector/python/linux/python3/taos.egg-info/PKG-INFO b/src/connector/python/linux/python3/taos.egg-info/PKG-INFO index b1a77c8ac7359192ed1c40ddfe79fb29aa1a596e..1e6c829ef1d8331852e89204596f60cd00a17dfc 100644 --- a/src/connector/python/linux/python3/taos.egg-info/PKG-INFO +++ b/src/connector/python/linux/python3/taos.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: taos -Version: 1.4.15 +Version: 2.0.0 Summary: TDengine python client package Home-page: https://github.com/pypa/sampleproject Author: Taosdata Inc. diff --git a/src/connector/python/windows/python2/setup.py b/src/connector/python/windows/python2/setup.py index cace06f443b113b1efef4e15133eee96bbef91d3..fd82a55650fcc284892d8387fbcf8666398f4d1a 100644 --- a/src/connector/python/windows/python2/setup.py +++ b/src/connector/python/windows/python2/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="taos", - version="1.4.15", + version="2.0.0", author="Taosdata Inc.", author_email="support@taosdata.com", description="TDengine python client package", diff --git a/src/connector/python/windows/python2/taos.egg-info/PKG-INFO b/src/connector/python/windows/python2/taos.egg-info/PKG-INFO index 9673ab6c5cd27eaa998b6e537bee90e72aeffc8d..9babb669a7570ca0dd2dcf2e808b9c8a07a61adc 100644 --- a/src/connector/python/windows/python2/taos.egg-info/PKG-INFO +++ b/src/connector/python/windows/python2/taos.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: taos -Version: 1.4.15 +Version: 2.0.0 Summary: TDengine python client package Home-page: https://github.com/pypa/sampleproject Author: Taosdata Inc. diff --git a/src/connector/python/windows/python3/setup.py b/src/connector/python/windows/python3/setup.py index 7cac04b723ae557194c9d52c23a594bda1934b43..9abdace5a9953279d5d7ba9ec6eeab7dfe0500fe 100644 --- a/src/connector/python/windows/python3/setup.py +++ b/src/connector/python/windows/python3/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="taos", - version="1.4.15", + version="2.0.0", author="Taosdata Inc.", author_email="support@taosdata.com", description="TDengine python client package", diff --git a/src/connector/python/windows/python3/taos.egg-info/PKG-INFO b/src/connector/python/windows/python3/taos.egg-info/PKG-INFO index 191327092b8e08799979cfa2ff7e7818af513c7f..6213b0d165ed4d807e35038807c9f20b33cd921e 100644 --- a/src/connector/python/windows/python3/taos.egg-info/PKG-INFO +++ b/src/connector/python/windows/python3/taos.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: taos -Version: 1.4.15 +Version: 2.0.0 Summary: TDengine python client package Home-page: https://github.com/pypa/sampleproject Author: Hongze Cheng diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index bff2403bb8b00537fb25ceab93ddf34263dc1c17..916e8904ff6f0f7a9f0cbbb306ee5823d6281334 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -216,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -289,7 +289,7 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbManager().pickTaskType() # pick a task type for current state + taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): @@ -686,23 +686,126 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one + self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) +class StateMechine : + def __init__(self, dbConn): + self._dbConn = dbConn + self._curState = self._findCurrentState() # starting state + self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. + + def getCurrentState(self): + return self._curState + + # May be slow, use cautionsly... + def getTaskTypes(self): # those that can run (directly/indirectly) from the current state + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks + firstTaskTypes = [] + for tc in allTaskClasses: + # t = tc(self) # create task object + if tc.canBeginFrom(self._curState): + firstTaskTypes.append(tc) + # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones + taskTypes = firstTaskTypes.copy() # have to have these + for task1 in firstTaskTypes: # each task type gathered so far + endState = task1.getEndState() # figure the end state + if endState == None: # does not change end state + continue # no use, do nothing + for tc in allTaskClasses: # what task can further begin from there? + if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): + taskTypes.append(tc) # gather it + + if len(taskTypes) <= 0: + raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) + return taskTypes + + def _findCurrentState(self): + dbc = self._dbConn + ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state + if dbc.query("show databases") == 0 : # no database?! + # logger.debug("Found EMPTY state") + logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) + return StateEmpty() + dbc.execute("use db") # did not do this when openning connection + if dbc.query("show tables") == 0 : # no tables + # logger.debug("Found DB ONLY state") + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) + return StateDbOnly() + if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables + # logger.debug("Found TABLE_ONLY state") + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + return StateSuperTableOnly() + else: # has actual tables + # logger.debug("Found HAS_DATA state") + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) + return StateHasData() + + def transition(self, tasks): + if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + return # do nothing + + self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps + + # Generic Checks, first based on the start state + if self._curState.canCreateDb(): + self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb) + # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops + + if self._curState.canDropDb(): + self._curState.assertIfExistThenSuccess(tasks, TaskDropDb) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop + + # if self._state.canCreateFixedTable(): + # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create + + # if self._state.canDropFixedTable(): + # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped + # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop + + # if self._state.canAddData(): + # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually + + # if self._state.canReadData(): + # Nothing for sure + + newState = self._findCurrentState() + logger.debug("[STT] New DB state determined: {}".format(newState)) + self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? + self._curState = newState + + def pickTaskType(self): + taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state + weights = [] + for tt in taskTypes: + endState = tt.getEndState() + if endState != None : + weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method + else: + weights.append(10) # read data task, default to 10: TODO: change to a constant + i = self._weighted_choice_sub(weights) + # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + return taskTypes[i] + + def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ + rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? + for i, w in enumerate(weights): + rnd -= w + if rnd < 0: + return i # Manager of the Database Data/Connection -class DbManager(): - +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - - self._state = StateInvalid() # starting state - self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. # self.openDbServerConnection() self._dbConn = DbConn() @@ -710,7 +813,7 @@ class DbManager(): self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected except taos.error.ProgrammingError as err: # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) - if ( err.msg == 'disconnected' ): # cannot open DB connection + if ( err.msg == 'client disconnected' ): # cannot open DB connection print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") sys.exit() else: @@ -721,13 +824,17 @@ class DbManager(): if resetDb : self._dbConn.resetDb() # drop and recreate DB - self._state = self._findCurrentState() + self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape + def getDbConn(self): return self._dbConn - def getState(self): - return self._state + def getStateMachine(self): + return self._stateMachine + + # def getState(self): + # return self._stateMachine.getCurrentState() # We aim to create a starting time tick, such that, whenever we run our test here once # We should be able to safely create 100,000 records, which will not have any repeated time stamp @@ -754,7 +861,8 @@ class DbManager(): tIndex = self.tableNumQueue.push() return tIndex - def getFixedSuperTableName(self): + @classmethod + def getFixedSuperTableName(cls): return "fs_table" def releaseTable(self, i): # return the table back, so others can use it @@ -786,122 +894,6 @@ class DbManager(): def cleanUp(self): self._dbConn.close() - # May be slow, use cautionsly... - def getTaskTypesAtState(self): - allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks - firstTaskTypes = [] - for tc in allTaskClasses: - # t = tc(self) # create task object - if tc.canBeginFrom(self._state): - firstTaskTypes.append(tc) - # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones - taskTypes = firstTaskTypes.copy() # have to have these - for task1 in firstTaskTypes: # each task type gathered so far - endState = task1.getEndState() # figure the end state - if endState == None: - continue - for tc in allTaskClasses: # what task can further begin from there? - if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): - taskTypes.append(tc) # gather it - - if len(taskTypes) <= 0: - raise RuntimeError("No suitable task types found for state: {}".format(self._state)) - logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes)) - return taskTypes - - # tasks.append(ReadFixedDataTask(self)) # always for everybody - # if ( self._state == self.STATE_EMPTY ): - # tasks.append(CreateDbTask(self)) - # tasks.append(CreateFixedTableTask(self)) - # elif ( self._state == self.STATE_DB_ONLY ): - # tasks.append(DropDbTask(self)) - # tasks.append(CreateFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # elif ( self._state == self.STATE_TABLE_ONLY ): - # tasks.append(DropFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - # tasks.append(DropFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # else: - # raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - # return tasks - - def pickTaskType(self): - taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state - weights = [] - for tt in taskTypes: - endState = tt.getEndState() - if endState != None : - weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method - else: - weights.append(10) # read data task, default to 10: TODO: change to a constant - i = self._weighted_choice_sub(weights) - # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) - return taskTypes[i] - - def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ - rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? - for i, w in enumerate(weights): - rnd -= w - if rnd < 0: - return i - - def _findCurrentState(self): - dbc = self._dbConn - ts = time.time() - if dbc.query("show databases") == 0 : # no database?! - # logger.debug("Found EMPTY state") - logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) - return StateEmpty() - dbc.execute("use db") # did not do this when openning connection - if dbc.query("show tables") == 0 : # no tables - # logger.debug("Found DB ONLY state") - logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) - return StateDbOnly() - if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data - # logger.debug("Found TABLE_ONLY state") - logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) - return StateSuperTableOnly() - else: - # logger.debug("Found HAS_DATA state") - logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) - return StateHasData() - - def transition(self, tasks): - if ( len(tasks) == 0 ): # before 1st step, or otherwise empty - return # do nothing - - self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps - - # Generic Checks, first based on the start state - if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, TaskCreateDb) - # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops - - if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, TaskDropDb) - # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop - - # if self._state.canCreateFixedTable(): - # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped - # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create - - # if self._state.canDropFixedTable(): - # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped - # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop - - # if self._state.canAddData(): - # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually - - # if self._state.canReadData(): - # Nothing for sure - - newState = self._findCurrentState() - logger.debug("[STT] New DB state determined: {}".format(newState)) - self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? - self._state = newState - class TaskExecutor(): def __init__(self, curStep): self._curStep = curStep @@ -928,7 +920,7 @@ class Task(): return Task.taskSn def __init__(self, dbManager: DbManager, execStats: ExecutionStats): - self._dbState = dbManager + self._dbManager = dbManager self._workerThread = None self._err = None self._curStep = None @@ -944,7 +936,7 @@ class Task(): return self._err == None def clone(self): # TODO: why do we need this again? - newTask = self.__class__(self._dbState, self._execStats) + newTask = self.__class__(self._dbManager, self._execStats) return newTask def logDebug(self, msg): @@ -980,7 +972,7 @@ class Task(): self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): - return self._dbState.execute(sql) + return self._dbManager.execute(sql) class ExecutionStats: @@ -1047,20 +1039,22 @@ class ExecutionStats: class StateTransitionTask(Task): - # @classmethod - # def getAllTaskClasses(cls): # static - # return cls.__subclasses__() @classmethod def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") + _endState = None + @classmethod + def getEndState(cls): # TODO: optimize by calling it fewer times + raise RuntimeError("Overriding method expected") + # @classmethod # def getBeginStates(cls): # return cls.getInfo()[0] - @classmethod - def getEndState(cls): # returning the class name - return cls.getInfo()[0] + # @classmethod + # def getEndState(cls): # returning the class name + # return cls.getInfo()[0] @classmethod def canBeginFrom(cls, state: AnyState): @@ -1070,15 +1064,10 @@ class StateTransitionTask(Task): def execute(self, wt: WorkerThread): super().execute(wt) - - class TaskCreateDb(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_EMPTY], # can begin from - StateDbOnly() # end state - ] + def getEndState(cls): + return StateDbOnly() @classmethod def canBeginFrom(cls, state: AnyState): @@ -1089,11 +1078,8 @@ class TaskCreateDb(StateTransitionTask): class TaskDropDb(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateEmpty() - ] + def getEndState(cls): + return StateEmpty() @classmethod def canBeginFrom(cls, state: AnyState): @@ -1105,36 +1091,30 @@ class TaskDropDb(StateTransitionTask): class TaskCreateSuperTable(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_DB_ONLY], - StateSuperTableOnly() - ] + def getEndState(cls): + return StateSuperTableOnly() @classmethod def canBeginFrom(cls, state: AnyState): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedSuperTableName() + tblName = self._dbManager.getFixedSuperTableName() wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that automatically class TaskReadData(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - None # meaning doesn't affect state - ] + def getEndState(cls): + return None # meaning doesn't affect state @classmethod def canBeginFrom(cls, state: AnyState): return state.canReadData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - sTbName = self._dbState.getFixedSuperTableName() + sTbName = self._dbManager.getFixedSuperTableName() dbc = wt.getDbConn() dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations @@ -1150,20 +1130,38 @@ class TaskReadData(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateDbOnly() # meaning doesn't affect state - ] + def getEndState(cls): + return StateDbOnly() @classmethod def canBeginFrom(cls, state: AnyState): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedSuperTableName() + tblName = self._dbManager.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) +class TaskAlterTags(StateTransitionTask): + @classmethod + def getEndState(cls): + return None # meaning doesn't affect state + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropFixedSuperTable() # if we can drop it, we can alter tags + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbManager.getFixedSuperTableName() + dice = Dice.throw(4) + if dice == 0 : + wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) + elif dice == 1 : + wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) + elif dice == 2 : + wt.execSql("alter table db.{} drop tag newTag".format(tblName)) + else: # dice == 3 + wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) + class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on LARGE_NUMBER_OF_TABLES = 35 @@ -1186,18 +1184,15 @@ class TaskAddData(StateTransitionTask): cls.fAddLogDone = open("add_log_done.txt", "w") @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateHasData() - ] + def getEndState(cls): + return StateHasData() @classmethod def canBeginFrom(cls, state: AnyState): return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState + ds = self._dbManager wt.execSql("use db") # TODO: seems to be an INSERT bug to require this tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) @@ -1301,7 +1296,67 @@ class LoggingFilter(logging.Filter): # return False return True +class MainExec: + @classmethod + def runClient(cls): + # resetDb = False # DEBUG only + # dbState = DbState(resetDb) # DBEUG only! + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + tc = ThreadCoordinator(thPool, dbManager) + tc.run() + tc.logStats() + dbManager.cleanUp() + + @classmethod + def runService(cls): + print("Running service...") + + @classmethod + def runTemp(cls): # for debugging purposes + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1314,24 +1369,27 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) + parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-e', '--run-tdengine', action='store_true', + help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + parser.add_argument('-p', '--per-thread-db-connection', action='store_false', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - if len(sys.argv) == 1: - parser.print_help() - sys.exit() + # if len(sys.argv) == 1: + # parser.print_help() + # sys.exit() global logger logger = logging.getLogger('CrashGen') @@ -1343,62 +1401,11 @@ def main(): ch = logging.StreamHandler() logger.addHandler(ch) - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbManager = DbManager() # Regular function - Dice.seed(0) # initial seeding of dice - tc = ThreadCoordinator( - ThreadPool(gConfig.num_threads, gConfig.max_steps), - # WorkDispatcher(dbState), # Obsolete? - dbManager - ) - - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - + if gConfig.run_tdengine : # run server + MainExec.runService() + else : + MainExec.runClient() - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - - tc.run() - tc.logStats() - dbManager.cleanUp() # logger.info("Crash_Gen execution finished")