提交 370fadb0 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
from .constants import FieldType from .constants import FieldType
import threading
# querySeqNum = 0 # querySeqNum = 0
...@@ -37,6 +38,7 @@ class TDengineCursor(object): ...@@ -37,6 +38,7 @@ class TDengineCursor(object):
self._block_iter = 0 self._block_iter = 0
self._affected_rows = 0 self._affected_rows = 0
self._logfile = "" self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None: if connection is not None:
self._connection = connection self._connection = connection
...@@ -103,6 +105,12 @@ class TDengineCursor(object): ...@@ -103,6 +105,12 @@ class TDengineCursor(object):
def execute(self, operation, params=None): def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command). """Prepare and execute a database operation (query or command).
""" """
# if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if not operation: if not operation:
return None return None
...@@ -188,6 +196,11 @@ class TDengineCursor(object): ...@@ -188,6 +196,11 @@ class TDengineCursor(object):
def fetchall(self): def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation. """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
""" """
# if threading.get_ident() != self._threadId:
# info ="[WARNING] Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if self._result is None or self._fields is None: if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall") raise OperationalError("Invalid use of fetchall")
...@@ -232,6 +245,12 @@ class TDengineCursor(object): ...@@ -232,6 +245,12 @@ class TDengineCursor(object):
def _handle_result(self): def _handle_result(self):
"""Handle the return result from query. """Handle the return result from query.
""" """
# if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
self._description = [] self._description = []
for ele in self._fields: for ele in self._fields:
self._description.append( self._description.append(
......
...@@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { ...@@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
} }
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
SMnodeMsg *pWrite = pRaw; SMnodeMsg *pWrite = pMsg;
if (pWrite == NULL) return; if (pWrite == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
......
...@@ -206,9 +206,10 @@ static void shellSourceFile(TAOS *con, char *fptr) { ...@@ -206,9 +206,10 @@ static void shellSourceFile(TAOS *con, char *fptr) {
if (code != 0) { if (code != 0) {
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo); fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo);
/* free local resouce: allocated memory/metric-meta refcnt */
taos_free_result(pSql);
} }
/* free local resouce: allocated memory/metric-meta refcnt */
taos_free_result(pSql);
memset(cmd, 0, MAX_COMMAND_SIZE); memset(cmd, 0, MAX_COMMAND_SIZE);
cmd_len = 0; cmd_len = 0;
......
...@@ -520,9 +520,8 @@ int main(int argc, char *argv[]) { ...@@ -520,9 +520,8 @@ int main(int argc, char *argv[]) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
queryDB(taos, command); queryDB(taos, command);
printf("meters created!\n"); printf("meters created!\n");
taos_close(taos);
} }
taos_close(taos);
/* Wait for table to create */ /* Wait for table to create */
multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass); multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass);
...@@ -792,9 +791,6 @@ void * createTable(void *sarg) ...@@ -792,9 +791,6 @@ void * createTable(void *sarg)
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols); snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command); queryDB(winfo->taos, command);
} }
taos_close(winfo->taos);
} else { } else {
/* Create all the tables; */ /* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
...@@ -812,7 +808,6 @@ void * createTable(void *sarg) ...@@ -812,7 +808,6 @@ void * createTable(void *sarg)
} }
queryDB(winfo->taos, command); queryDB(winfo->taos, command);
} }
taos_close(winfo->taos);
} }
return NULL; return NULL;
......
...@@ -53,6 +53,7 @@ typedef struct { ...@@ -53,6 +53,7 @@ typedef struct {
void * rowData; void * rowData;
int32_t rowSize; int32_t rowSize;
int32_t retCode; // for callback in sdb queue int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
struct SMnodeMsg *pMsg; struct SMnodeMsg *pMsg;
} SSdbOper; } SSdbOper;
......
...@@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { ...@@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
} }
static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) { static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pNew = pOper->pObj;
SDnodeObj *pSaved = mnodeGetDnode(pDnode->dnodeId); SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
if (pSaved != NULL && pDnode != pSaved) { if (pDnode != NULL && pNew != pDnode) {
memcpy(pSaved, pDnode, pOper->rowSize); memcpy(pDnode, pNew, pOper->rowSize);
free(pDnode); free(pNew);
mnodeDecDnodeRef(pSaved);
} }
mnodeDecDnodeRef(pDnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -72,8 +72,6 @@ typedef struct { ...@@ -72,8 +72,6 @@ typedef struct {
void * sync; void * sync;
void * wal; void * wal;
SSyncCfg cfg; SSyncCfg cfg;
sem_t sem;
int32_t code;
int32_t numOfTables; int32_t numOfTables;
SSdbTable *tableList[SDB_TABLE_MAX]; SSdbTable *tableList[SDB_TABLE_MAX];
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -244,27 +242,36 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { ...@@ -244,27 +242,36 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
} }
FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tsSdbObj.code = code; assert(param);
sem_post(&tsSdbObj.sem); SSdbOper * pOper = param;
sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); SMnodeMsg *pMsg = pOper->pMsg;
} if (code <= 0) pOper->retCode = code;
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
if (processedCount <= 1) {
if (pMsg != NULL) {
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount);
}
return;
}
static int32_t sdbForwardToPeer(SWalHead *pHead) { if (pMsg != NULL) {
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
}
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); if (pOper->cb != NULL) {
if (code > 0) { pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode);
sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); }
sem_wait(&tsSdbObj.sem);
return tsSdbObj.code; dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
} taosFreeQitem(pOper);
return code;
} }
void sdbUpdateSync() { void sdbUpdateSync() {
SSyncCfg syncCfg = {0}; SSyncCfg syncCfg = {0};
int32_t index = 0; int32_t index = 0;
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
for (int32_t i = 0; i < mnodes->nodeNum; ++i) { for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
...@@ -298,7 +305,7 @@ void sdbUpdateSync() { ...@@ -298,7 +305,7 @@ void sdbUpdateSync() {
} }
syncCfg.replica = index; syncCfg.replica = index;
syncCfg.quorum = (syncCfg.replica == 1) ? 1:2; syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
bool hasThisDnode = false; bool hasThisDnode = false;
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
...@@ -325,10 +332,10 @@ void sdbUpdateSync() { ...@@ -325,10 +332,10 @@ void sdbUpdateSync() {
syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.writeToCache = sdbWriteToQueue; syncInfo.writeToCache = sdbWriteToQueue;
syncInfo.confirmForward = sdbConfirmForward; syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole; syncInfo.notifyRole = sdbNotifyRole;
tsSdbObj.cfg = syncCfg; tsSdbObj.cfg = syncCfg;
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
syncReconfig(tsSdbObj.sync, &syncCfg); syncReconfig(tsSdbObj.sync, &syncCfg);
} else { } else {
...@@ -339,7 +346,6 @@ void sdbUpdateSync() { ...@@ -339,7 +346,6 @@ void sdbUpdateSync() {
int32_t sdbInit() { int32_t sdbInit() {
pthread_mutex_init(&tsSdbObj.mutex, NULL); pthread_mutex_init(&tsSdbObj.mutex, NULL);
sem_init(&tsSdbObj.sem, 0, 0);
if (sdbInitWriteWorker() != 0) { if (sdbInitWriteWorker() != 0) {
return -1; return -1;
...@@ -379,7 +385,6 @@ void sdbCleanUp() { ...@@ -379,7 +385,6 @@ void sdbCleanUp() {
tsSdbObj.wal = NULL; tsSdbObj.wal = NULL;
} }
sem_destroy(&tsSdbObj.sem);
pthread_mutex_destroy(&tsSdbObj.mutex); pthread_mutex_destroy(&tsSdbObj.mutex);
} }
...@@ -513,24 +518,22 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -513,24 +518,22 @@ static int sdbWrite(void *param, void *data, int type) {
assert(pTable != NULL); assert(pTable != NULL);
pthread_mutex_lock(&tsSdbObj.mutex); pthread_mutex_lock(&tsSdbObj.mutex);
if (pHead->version == 0) { if (pHead->version == 0) {
// assign version // assign version
tsSdbObj.version++; tsSdbObj.version++;
pHead->version = tsSdbObj.version; pHead->version = tsSdbObj.version;
} else { } else {
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= tsSdbObj.version) { if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) { sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version); pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) { } else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
tsSdbObj.version);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} else { } else {
tsSdbObj.version = pHead->version; tsSdbObj.version = pHead->version;
...@@ -542,28 +545,36 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -542,28 +545,36 @@ static int sdbWrite(void *param, void *data, int type) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
return code; return code;
} }
code = sdbForwardToPeer(pHead);
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
// from app, oper is created // from app, oper is created
if (pOper != NULL) { if (pOper != NULL) {
sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", // forward to peers
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->processedCount = 0;
tstrerror(code)); int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
return code; if (syncCode <= 0) pOper->processedCount = 1;
if (syncCode < 0) {
sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} else if (syncCode > 0) {
sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} else {
sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
}
return syncCode;
} }
// from wal or forward msg, oper not created, should add into hash sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
if (tsSdbObj.sync != NULL) { sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it",
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
} else {
sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
}
// even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
// from wal or forward msg, oper not created, should add into hash
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
code = (*pTable->decodeFp)(&oper); code = (*pTable->decodeFp)(&oper);
...@@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) { ...@@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) {
taosGetQitem(tsSdbWriteQall, &type, &item); taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
pOper->processedCount = 1;
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
if (pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
pOper = NULL; pOper = NULL;
} }
if (pOper != NULL && pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type);
if (pOper) pOper->retCode = code; if (pOper && code <= 0) pOper->retCode = code;
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal);
...@@ -965,25 +976,17 @@ static void *sdbWorkerFp(void *param) { ...@@ -965,25 +976,17 @@ static void *sdbWorkerFp(void *param) {
taosResetQitems(tsSdbWriteQall); taosResetQitems(tsSdbWriteQall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(tsSdbWriteQall, &type, &item); taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
if (pOper != NULL && pOper->cb != NULL) { sdbDecRef(pOper->table, pOper->pObj);
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); sdbConfirmForward(NULL, pOper, pOper->retCode);
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) {
} syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
taosFreeQitem(item);
if (pOper != NULL && pOper->pMsg != NULL) { } else {
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, taosFreeQitem(item);
tstrerror(pOper->retCode));
}
if (pOper != NULL) {
sdbDecRef(pOper->table, pOper->pObj);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
} }
taosFreeQitem(item);
} }
} }
......
...@@ -783,9 +783,15 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -783,9 +783,15 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
if (pTable != NULL) { assert(pTable);
mLInfo("app:%p:%p, stable:%s, is created in sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code)); if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb", pTable->info.tableId);
} else {
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb};
sdbDeleteRow(&desc);
} }
return code; return code;
...@@ -1561,10 +1567,16 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1561,10 +1567,16 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
assert(pTable); assert(pTable);
mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg, if (code == TSDB_CODE_SUCCESS) {
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); mDebug("app:%p:%p, table:%s, create table in sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pTable->sid, pTable->uid);
if (code != TSDB_CODE_SUCCESS) return code; } else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
......
...@@ -348,17 +348,23 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { ...@@ -348,17 +348,23 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
} }
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pMsg->pVgroup = NULL; mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code; return code;
} }
SVgObj *pVgroup = pMsg->pVgroup; mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
SDbObj *pDb = pMsg->pDb; pDb->name, pVgroup->numOfVnodes);
mInfo("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mInfo("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); mInfo("app:%p:%p, vgId:%d, index:%d, dnode:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, i,
pVgroup->vnodeGid[i].dnodeId);
} }
mnodeIncVgroupRef(pVgroup); mnodeIncVgroupRef(pVgroup);
......
...@@ -156,6 +156,7 @@ int main(int argc, char *argv[]) { ...@@ -156,6 +156,7 @@ int main(int argc, char *argv[]) {
} }
tInfo("client is initialized"); tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
......
...@@ -24,23 +24,21 @@ int msgSize = 128; ...@@ -24,23 +24,21 @@ int msgSize = 128;
int commit = 0; int commit = 0;
int dataFd = -1; int dataFd = -1;
void *qhandle = NULL; void *qhandle = NULL;
void *qset = NULL;
void processShellMsg() { void processShellMsg() {
static int num = 0; static int num = 0;
taos_qall qall; taos_qall qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type; int type;
void *pvnode;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, qall); int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode);
if (numOfMsgs <= 0) {
usleep(100);
continue;
}
tDebug("%d shell msgs are received", numOfMsgs); tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &type, (void **)&pRpcMsg); taosGetQitem(qall, &type, (void **)&pRpcMsg);
...@@ -82,15 +80,6 @@ void processShellMsg() { ...@@ -82,15 +80,6 @@ void processShellMsg() {
} }
taosFreeQall(qall); taosFreeQall(qall);
/*
SRpcIpSet ipSet;
ipSet.numOfIps = 1;
ipSet.index = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr("192.168.0.2");
rpcSendRedirectRsp(ahandle, &ipSet);
*/
} }
...@@ -189,6 +178,8 @@ int main(int argc, char *argv[]) { ...@@ -189,6 +178,8 @@ int main(int argc, char *argv[]) {
} }
qhandle = taosOpenQueue(sizeof(SRpcMsg)); qhandle = taosOpenQueue(sizeof(SRpcMsg));
qset = taosOpenQset();
taosAddIntoQset(qset, qhandle, NULL);
processShellMsg(); processShellMsg();
......
#!/usr/bin/python3.7 #-----!/usr/bin/python3.7
################################################################### ###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc. # Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved. # All rights reserved.
...@@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h ...@@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h
import sys import sys
import os import os
import io
import signal
import traceback import traceback
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
...@@ -23,6 +25,7 @@ if sys.version_info[0] < 3: ...@@ -23,6 +25,7 @@ if sys.version_info[0] < 3:
import getopt import getopt
import argparse import argparse
import copy import copy
import requests
import threading import threading
import random import random
...@@ -30,10 +33,14 @@ import time ...@@ -30,10 +33,14 @@ import time
import logging import logging
import datetime import datetime
import textwrap import textwrap
import requests
from requests.auth import HTTPBasicAuth
from typing import List from typing import List
from typing import Dict from typing import Dict
from typing import Set from typing import Set
from typing import IO
from queue import Queue, Empty
from util.log import * from util.log import *
from util.dnodes import * from util.dnodes import *
...@@ -76,7 +83,10 @@ class WorkerThread: ...@@ -76,7 +83,10 @@ class WorkerThread:
# Let us have a DB connection of our own # Let us have a DB connection of our own
if ( gConfig.per_thread_db_connection ): # type: ignore if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn = DbConn() # print("connector_type = {}".format(gConfig.connector_type))
self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest()
self._dbInUse = False # if "use db" was executed already
def logDebug(self, msg): def logDebug(self, msg):
logger.debug(" TRD[{}] {}".format(self._tid, msg)) logger.debug(" TRD[{}] {}".format(self._tid, msg))
...@@ -84,7 +94,14 @@ class WorkerThread: ...@@ -84,7 +94,14 @@ class WorkerThread:
def logInfo(self, msg): def logInfo(self, msg):
logger.info(" TRD[{}] {}".format(self._tid, msg)) logger.info(" TRD[{}] {}".format(self._tid, msg))
def dbInUse(self):
return self._dbInUse
def useDb(self):
if ( not self._dbInUse ):
self.execSql("use db")
self._dbInUse = True
def getTaskExecutor(self): def getTaskExecutor(self):
return self._tc.getTaskExecutor() return self._tc.getTaskExecutor()
...@@ -97,6 +114,7 @@ class WorkerThread: ...@@ -97,6 +114,7 @@ class WorkerThread:
logger.info("Starting to run thread: {}".format(self._tid)) logger.info("Starting to run thread: {}".format(self._tid))
if ( gConfig.per_thread_db_connection ): # type: ignore if ( gConfig.per_thread_db_connection ): # type: ignore
logger.debug("Worker thread openning database connection")
self._dbConn.open() self._dbConn.open()
self._doTaskLoop() self._doTaskLoop()
...@@ -118,12 +136,17 @@ class WorkerThread: ...@@ -118,12 +136,17 @@ class WorkerThread:
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Fetch a task from the Thread Coordinator
logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask() task = tc.fetchTask()
# Execute such a task
logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__)) logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
task.execute(self) task.execute(self)
tc.saveExecutedTask(task) tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
self._dbInUse = False # there may be changes between steps
def verifyThreadSelf(self): # ensure we are called by this own thread def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ): if ( threading.get_ident() != self._thread.ident ):
...@@ -163,6 +186,18 @@ class WorkerThread: ...@@ -163,6 +186,18 @@ class WorkerThread:
else: else:
return self._tc.getDbManager().getDbConn().execute(sql) return self._tc.getDbManager().getDbConn().execute(sql)
def querySql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ):
return self._dbConn.query(sql)
else:
return self._tc.getDbManager().getDbConn().query(sql)
def getQueryResult(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn.getQueryResult()
else:
return self._tc.getDbManager().getDbConn().getQueryResult()
def getDbConn(self): def getDbConn(self):
if ( gConfig.per_thread_db_connection ): if ( gConfig.per_thread_db_connection ):
return self._dbConn return self._dbConn
...@@ -175,8 +210,9 @@ class WorkerThread: ...@@ -175,8 +210,9 @@ class WorkerThread:
# else: # else:
# return self._tc.getDbState().getDbConn().query(sql) # return self._tc.getDbState().getDbConn().query(sql)
# The coordinator of all worker threads, mostly running in main thread
class ThreadCoordinator: class ThreadCoordinator:
def __init__(self, pool, dbManager): def __init__(self, pool: ThreadPool, dbManager):
self._curStep = -1 # first step is 0 self._curStep = -1 # first step is 0
self._pool = pool self._pool = pool
# self._wd = wd # self._wd = wd
...@@ -187,6 +223,7 @@ class ThreadCoordinator: ...@@ -187,6 +223,7 @@ class ThreadCoordinator:
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats() self._execStats = ExecutionStats()
self._runStatus = MainExec.STATUS_RUNNING
def getTaskExecutor(self): def getTaskExecutor(self):
return self._te return self._te
...@@ -197,6 +234,10 @@ class ThreadCoordinator: ...@@ -197,6 +234,10 @@ class ThreadCoordinator:
def crossStepBarrier(self): def crossStepBarrier(self):
self._stepBarrier.wait() self._stepBarrier.wait()
def requestToStop(self):
self._runStatus = MainExec.STATUS_STOPPING
self._execStats.registerFailure("User Interruption")
def run(self): def run(self):
self._pool.createAndStartThreads(self) self._pool.createAndStartThreads(self)
...@@ -204,48 +245,73 @@ class ThreadCoordinator: ...@@ -204,48 +245,73 @@ class ThreadCoordinator:
self._curStep = -1 # not started yet self._curStep = -1 # not started yet
maxSteps = gConfig.max_steps # type: ignore maxSteps = gConfig.max_steps # type: ignore
self._execStats.startExec() # start the stop watch self._execStats.startExec() # start the stop watch
failed = False transitionFailed = False
while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 hasAbortedTask = False
while(self._curStep < maxSteps-1 and
(not transitionFailed) and
(self._runStatus==MainExec.STATUS_RUNNING) and
(not hasAbortedTask)): # maxStep==10, last curStep should be 9
if not gConfig.debug: if not gConfig.debug:
print(".", end="", flush=True) # print this only if we are not in debug mode print(".", end="", flush=True) # print this only if we are not in debug mode
logger.debug("[TRD] Main thread going to sleep") logger.debug("[TRD] Main thread going to sleep")
# Now ready to enter a step # Now main thread (that's us) is ready to enter a step
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
self._stepBarrier.reset() # Other worker threads should now be at the "gate" self._stepBarrier.reset() # Other worker threads should now be at the "gate"
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
try: # We use this period to do house keeping work, when all worker threads are QUIET.
self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state hasAbortedTask = False
except taos.error.ProgrammingError as err: for task in self._executedTasks :
if ( err.msg == 'network unavailable' ): # broken DB connection if task.isAborted() :
logger.info("DB connection broken, execution failed") print("Task aborted: {}".format(task))
traceback.print_stack() hasAbortedTask = True
failed = True break
self._te = None # Not running any more
self._execStats.registerFailure("Broken DB Connection") if hasAbortedTask : # do transition only if tasks are error free
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop self._execStats.registerFailure("Aborted Task Encountered")
else: else:
raise try:
finally: sm = self._dbManager.getStateMachine()
pass logger.debug("[STT] starting transitions")
sm.transition(self._executedTasks) # at end of step, transiton the DB state
logger.debug("[STT] transition ended")
# Due to limitation (or maybe not) of the Python library, we cannot share connections across threads
if sm.hasDatabase() :
for t in self._pool.threadList:
logger.debug("[DB] use db for all worker threads")
t.useDb()
# t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection
logger.info("DB connection broken, execution failed")
traceback.print_stack()
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
else:
raise
# finally:
# pass
self.resetExecutedTasks() # clear the tasks after we are done self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step # Get ready for next step
logger.debug("<-- Step {} finished".format(self._curStep)) logger.debug("<-- Step {} finished".format(self._curStep))
self._curStep += 1 # we are about to get into next step. TODO: race condition here! self._curStep += 1 # we are about to get into next step. TODO: race condition here!
logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
# A new TE for the new step # A new TE for the new step
if not failed: # only if not failed if not transitionFailed: # only if not failed
self._te = TaskExecutor(self._curStep) self._te = TaskExecutor(self._curStep)
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
self.tapAllThreads() self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task
logger.debug("Main thread ready to finish up...") logger.debug("Main thread ready to finish up...")
if not failed: # only in regular situations if not transitionFailed: # only in regular situations
self.crossStepBarrier() # Cross it one last time, after all threads finish self.crossStepBarrier() # Cross it one last time, after all threads finish
self._stepBarrier.reset() self._stepBarrier.reset()
logger.debug("Main thread in exclusive zone...") logger.debug("Main thread in exclusive zone...")
...@@ -255,11 +321,17 @@ class ThreadCoordinator: ...@@ -255,11 +321,17 @@ class ThreadCoordinator:
logger.debug("Main thread joining all threads") logger.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish self._pool.joinAll() # Get all threads to finish
logger.info("All worker thread finished") logger.info("\nAll worker threads finished")
self._execStats.endExec() self._execStats.endExec()
def logStats(self): def printStats(self):
self._execStats.logStats() self._execStats.printStats()
def isFailed(self):
return self._execStats.isFailed()
def getExecStats(self):
return self._execStats
def tapAllThreads(self): # in a deterministic manner def tapAllThreads(self): # in a deterministic manner
wakeSeq = [] wakeSeq = []
...@@ -268,7 +340,7 @@ class ThreadCoordinator: ...@@ -268,7 +340,7 @@ class ThreadCoordinator:
wakeSeq.append(i) wakeSeq.append(i)
else: else:
wakeSeq.insert(0, i) wakeSeq.insert(0, i)
logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq)))
# TODO: set dice seed to a deterministic value # TODO: set dice seed to a deterministic value
for i in wakeSeq: for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
...@@ -306,7 +378,7 @@ class ThreadPool: ...@@ -306,7 +378,7 @@ class ThreadPool:
self.maxSteps = maxSteps self.maxSteps = maxSteps
# Internal class variables # Internal class variables
self.curStep = 0 self.curStep = 0
self.threadList = [] self.threadList = [] # type: List[WorkerThread]
# starting to run all the threads, in locking steps # starting to run all the threads, in locking steps
def createAndStartThreads(self, tc: ThreadCoordinator): def createAndStartThreads(self, tc: ThreadCoordinator):
...@@ -397,26 +469,39 @@ class LinearQueue(): ...@@ -397,26 +469,39 @@ class LinearQueue():
return ret return ret
class DbConn: class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
TYPE_INVALID = "invalid"
@classmethod
def create(cls, connType):
if connType == cls.TYPE_NATIVE:
return DbConnNative()
elif connType == cls.TYPE_REST:
return DbConnRest()
else:
raise RuntimeError("Unexpected connection type: {}".format(connType))
@classmethod
def createNative(cls):
return cls.create(cls.TYPE_NATIVE)
@classmethod
def createRest(cls):
return cls.create(cls.TYPE_REST)
def __init__(self): def __init__(self):
self._conn = None
self._cursor = None
self.isOpen = False self.isOpen = False
self._type = self.TYPE_INVALID
def open(self): # Open connection
def open(self):
if ( self.isOpen ): if ( self.isOpen ):
raise RuntimeError("Cannot re-open an existing DB connection") raise RuntimeError("Cannot re-open an existing DB connection")
cfgPath = "../../build/test/cfg" # below implemented by child classes
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable self.openByType()
self._cursor = self._conn.cursor()
# Get the connection/cursor ready logger.debug("[DB] data connection opened, type = {}".format(self._type))
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # note we do this in _findCurrenState
# Open connection
self._tdSql = TDSql()
self._tdSql.init(self._cursor)
self.isOpen = True self.isOpen = True
def resetDb(self): # reset the whole database, etc. def resetDb(self): # reset the whole database, etc.
...@@ -424,17 +509,128 @@ class DbConn: ...@@ -424,17 +509,128 @@ class DbConn:
raise RuntimeError("Cannot reset database until connection is open") raise RuntimeError("Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc. # self._tdSql.prepare() # Recreate database, etc.
self._cursor.execute('drop database if exists db') self.execute('drop database if exists db')
logger.debug("Resetting DB, dropped database") logger.debug("Resetting DB, dropped database")
# self._cursor.execute('create database db') # self._cursor.execute('create database db')
# self._cursor.execute('use db') # self._cursor.execute('use db')
# tdSql.execute('show databases') # tdSql.execute('show databases')
def queryScalar(self, sql) -> int :
return self._queryAny(sql)
def queryString(self, sql) -> str :
return self._queryAny(sql)
def _queryAny(self, sql) : # actual query result as an int
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1 :
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def execute(self, sql):
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultCols(self):
raise RuntimeError("Unexpected execution, should be overriden")
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
def __init__(self):
super().__init__()
self._type = self.TYPE_REST
self._url = "http://localhost:6020/rest/sql" # fixed for now
self._result = None
def openByType(self): # Open connection
pass # do nothing, always open
def close(self):
if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST
logger.debug("[DB] REST Database connection closed")
self.isOpen = False
def _doSql(self, sql):
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
rj = r.json()
# Sanity check for the "Json Result"
if (not 'status' in rj):
raise RuntimeError("No status in REST response")
if rj['status'] == 'error': # clearly reported error
if (not 'code' in rj): # error without code
raise RuntimeError("REST error return without code")
errno = rj['code'] # May need to massage this in the future
# print("Raising programming error with REST return: {}".format(rj))
raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc'
if rj['status'] != 'succ': # better be this
raise RuntimeError("Unexpected REST return status: {}".format(rj['status']))
nRows = rj['rows'] if ('rows' in rj) else 0
self._result = rj
return nRows
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open")
logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql)
logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql) : # return rows affected
return self.execute(sql)
def getQueryResult(self):
return self._result['data']
def getResultRows(self):
print(self._result)
raise RuntimeError("TBD")
# return self._tdSql.queryRows
def getResultCols(self):
print(self._result)
raise RuntimeError("TBD")
class DbConnNative(DbConn):
def __init__(self):
super().__init__()
self._type = self.TYPE_REST
self._conn = None
self._cursor = None
def openByType(self): # Open connection
cfgPath = "../../build/test/cfg"
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every step
# Open connection
self._tdSql = TDSql()
self._tdSql.init(self._cursor)
def close(self): def close(self):
if ( not self.isOpen ): if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open") raise RuntimeError("Cannot clean up database until connection is open")
self._tdSql.close() self._tdSql.close()
logger.debug("[DB] Database connection closed")
self.isOpen = False self.isOpen = False
def execute(self, sql): def execute(self, sql):
...@@ -450,29 +646,19 @@ class DbConn: ...@@ -450,29 +646,19 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open") raise RuntimeError("Cannot query database until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql)) logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.query(sql) nRows = self._tdSql.query(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows return nRows
# results are in: return self._tdSql.queryResult # results are in: return self._tdSql.queryResult
def getQueryResult(self): def getQueryResult(self):
return self._tdSql.queryResult return self._tdSql.queryResult
def _queryAny(self, sql) : # actual query result as an int def getResultRows(self):
if ( not self.isOpen ): return self._tdSql.queryRows
raise RuntimeError("Cannot query database until connection is open")
tSql = self._tdSql
nRows = tSql.query(sql)
if nRows != 1 :
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if tSql.queryRows != 1 or tSql.queryCols != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return tSql.queryResult[0][0]
def queryScalar(self, sql) -> int : def getResultCols(self):
return self._queryAny(sql) return self._tdSql.queryCols
def queryString(self, sql) -> str :
return self._queryAny(sql)
class AnyState: class AnyState:
STATE_INVALID = -1 STATE_INVALID = -1
...@@ -620,10 +806,10 @@ class StateDbOnly(AnyState): ...@@ -620,10 +806,10 @@ class StateDbOnly(AnyState):
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
# self.assertAtMostOneSuccess(tasks, DropDbTask) # self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY # self._state = self.STATE_EMPTY
if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table # # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
if ( not self.hasTask(tasks, TaskDropSuperTable) ): # if ( not self.hasTask(tasks, TaskDropSuperTable) ):
self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# self.assertNoTask(tasks, DropDbTask) # should have have tried # self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail # # can't say there's add-data attempts, since they may all fail
...@@ -648,7 +834,9 @@ class StateSuperTableOnly(AnyState): ...@@ -648,7 +834,9 @@ class StateSuperTableOnly(AnyState):
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it
# self._state = self.STATE_DB_ONLY # self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
...@@ -680,18 +868,19 @@ class StateHasData(AnyState): ...@@ -680,18 +868,19 @@ class StateHasData(AnyState):
self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
self.hasSuccess(tasks, TaskDropSuperTable) self.hasSuccess(tasks, TaskDropSuperTable)
# self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
self.assertNoTask(tasks, TaskDropDb) # self.assertNoTask(tasks, TaskDropDb)
self.assertNoTask(tasks, TaskDropSuperTable) # self.assertNoTask(tasks, TaskDropSuperTable)
self.assertNoTask(tasks, TaskAddData) # self.assertNoTask(tasks, TaskAddData)
# self.hasSuccess(tasks, DeleteDataTasks) # self.hasSuccess(tasks, DeleteDataTasks)
else: # should be STATE_HAS_DATA else: # should be STATE_HAS_DATA
self.assertNoTask(tasks, TaskDropDb) if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one
self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it
if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table
self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
class StateMechine : class StateMechine:
def __init__(self, dbConn): def __init__(self, dbConn):
self._dbConn = dbConn self._dbConn = dbConn
self._curState = self._findCurrentState() # starting state self._curState = self._findCurrentState() # starting state
...@@ -700,8 +889,17 @@ class StateMechine : ...@@ -700,8 +889,17 @@ class StateMechine :
def getCurrentState(self): def getCurrentState(self):
return self._curState return self._curState
def hasDatabase(self):
return self._curState.canDropDb() # ha, can drop DB means it has one
# May be slow, use cautionsly... # May be slow, use cautionsly...
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
def typesToStrings(types):
ss = []
for t in types:
ss.append(t.__name__)
return ss
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
firstTaskTypes = [] firstTaskTypes = []
for tc in allTaskClasses: for tc in allTaskClasses:
...@@ -720,7 +918,7 @@ class StateMechine : ...@@ -720,7 +918,7 @@ class StateMechine :
if len(taskTypes) <= 0: if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) raise RuntimeError("No suitable task types found for state: {}".format(self._curState))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes)))
return taskTypes return taskTypes
def _findCurrentState(self): def _findCurrentState(self):
...@@ -730,7 +928,7 @@ class StateMechine : ...@@ -730,7 +928,7 @@ class StateMechine :
# logger.debug("Found EMPTY state") # logger.debug("Found EMPTY state")
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty() return StateEmpty()
dbc.execute("use db") # did not do this when openning connection dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own
if dbc.query("show tables") == 0 : # no tables if dbc.query("show tables") == 0 : # no tables
# logger.debug("Found DB ONLY state") # logger.debug("Found DB ONLY state")
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
...@@ -746,6 +944,7 @@ class StateMechine : ...@@ -746,6 +944,7 @@ class StateMechine :
def transition(self, tasks): def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
logger.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing return # do nothing
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
...@@ -807,14 +1006,14 @@ class DbManager(): ...@@ -807,14 +1006,14 @@ class DbManager():
self._lock = threading.RLock() self._lock = threading.RLock()
# self.openDbServerConnection() # self.openDbServerConnection()
self._dbConn = DbConn() self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest()
try: try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if ( err.msg == 'client disconnected' ): # cannot open DB connection if ( err.msg == 'client disconnected' ): # cannot open DB connection
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit() sys.exit(2)
else: else:
raise raise
except: except:
...@@ -829,7 +1028,7 @@ class DbManager(): ...@@ -829,7 +1028,7 @@ class DbManager():
def getDbConn(self): def getDbConn(self):
return self._dbConn return self._dbConn
def getStateMachine(self): def getStateMachine(self) -> StateMechine :
return self._stateMachine return self._stateMachine
# def getState(self): # def getState(self):
...@@ -869,8 +1068,11 @@ class DbManager(): ...@@ -869,8 +1068,11 @@ class DbManager():
def getNextTick(self): def getNextTick(self):
with self._lock: # prevent duplicate tick with self._lock: # prevent duplicate tick
self._lastTick += datetime.timedelta(0, 1) # add one second to it if Dice.throw(10) == 0 : # 1 in 10 chance
return self._lastTick return self._lastTick + datetime.timedelta(0, -100)
else: # regular
self._lastTick += datetime.timedelta(0, 1) # add one second to it
return self._lastTick
def getNextInt(self): def getNextInt(self):
with self._lock: with self._lock:
...@@ -894,15 +1096,60 @@ class DbManager(): ...@@ -894,15 +1096,60 @@ class DbManager():
self._dbConn.close() self._dbConn.close()
class TaskExecutor(): class TaskExecutor():
class BoundedList:
def __init__(self, size = 10):
self._size = size
self._list = []
def add(self, n: int) :
if not self._list: # empty
self._list.append(n)
return
# now we should insert
nItems = len(self._list)
insPos = 0
for i in range(nItems):
insPos = i
if n <= self._list[i] : # smaller than this item, time to insert
break # found the insertion point
insPos += 1 # insert to the right
if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item
return # do nothing
# print("Inserting at postion {}, value: {}".format(insPos, n))
self._list.insert(insPos, n) # insert
newLen = len(self._list)
if newLen <= self._size :
return # do nothing
elif newLen == (self._size + 1) :
del self._list[0] # remove the first item
else :
raise RuntimeError("Corrupt Bounded List")
def __str__(self):
return repr(self._list)
_boundedList = BoundedList()
def __init__(self, curStep): def __init__(self, curStep):
self._curStep = curStep self._curStep = curStep
@classmethod
def getBoundedList(cls):
return cls._boundedList
def getCurStep(self): def getCurStep(self):
return self._curStep return self._curStep
def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
task.execute(wt) task.execute(wt)
def recordDataMark(self, n: int):
# print("[{}]".format(n), end="", flush=True)
self._boundedList.add(n)
# def logInfo(self, msg): # def logInfo(self, msg):
# logger.info(" T[{}.x]: ".format(self._curStep) + msg) # logger.info(" T[{}.x]: ".format(self._curStep) + msg)
...@@ -922,6 +1169,7 @@ class Task(): ...@@ -922,6 +1169,7 @@ class Task():
self._dbManager = dbManager self._dbManager = dbManager
self._workerThread = None self._workerThread = None
self._err = None self._err = None
self._aborted = False
self._curStep = None self._curStep = None
self._numRows = None # Number of rows affected self._numRows = None # Number of rows affected
...@@ -930,10 +1178,14 @@ class Task(): ...@@ -930,10 +1178,14 @@ class Task():
# logger.debug("Creating new task {}...".format(self._taskNum)) # logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats self._execStats = execStats
self._lastSql = "" # last SQL executed/attempted
def isSuccess(self): def isSuccess(self):
return self._err == None return self._err == None
def isAborted(self):
return self._aborted
def clone(self): # TODO: why do we need this again? def clone(self): # TODO: why do we need this again?
newTask = self.__class__(self._dbManager, self._execStats) newTask = self.__class__(self._dbManager, self._execStats)
return newTask return newTask
...@@ -960,10 +1212,31 @@ class Task(): ...@@ -960,10 +1212,31 @@ class Task():
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
self._err = err if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
except: 1000 # REST catch-all error
self.logDebug("[=] Unexpected exception") ]) : # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
print("_", end="", flush=True)
self._err = err
else:
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)
self.logDebug(errMsg)
if gConfig.debug :
raise # so that we see full stack
else: # non-debug
print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
"----------------------------\n")
# sys.exit(-1)
self._err = err
self._aborted = True
except Exception as e :
self.logInfo("Non-TAOS exception encountered")
self._err = e
self._aborted = True
traceback.print_exc()
except :
self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
raise raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
...@@ -971,8 +1244,21 @@ class Task(): ...@@ -971,8 +1244,21 @@ class Task():
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
def execSql(self, sql): def execSql(self, sql):
self._lastSql = sql
return self._dbManager.execute(sql) return self._dbManager.execute(sql)
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.execSql(sql)
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.querySql(sql)
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
return wt.getQueryResult()
class ExecutionStats: class ExecutionStats:
def __init__(self): def __init__(self):
...@@ -987,6 +1273,12 @@ class ExecutionStats: ...@@ -987,6 +1273,12 @@ class ExecutionStats:
self._failed = False self._failed = False
self._failureReason = None self._failureReason = None
def __str__(self):
return "[ExecStats: _failed={}, _failureReason={}".format(self._failed, self._failureReason)
def isFailed(self):
return self._failed == True
def startExec(self): def startExec(self):
self._execStartTime = time.time() self._execStartTime = time.time()
...@@ -1018,7 +1310,7 @@ class ExecutionStats: ...@@ -1018,7 +1310,7 @@ class ExecutionStats:
self._failed = True self._failed = True
self._failureReason = reason self._failureReason = reason
def logStats(self): def printStats(self):
logger.info("----------------------------------------------------------------------") logger.info("----------------------------------------------------------------------")
logger.info("| Crash_Gen test {}, with the following stats:". logger.info("| Crash_Gen test {}, with the following stats:".
format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
...@@ -1033,11 +1325,17 @@ class ExecutionStats: ...@@ -1033,11 +1325,17 @@ class ExecutionStats:
logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
logger.info("----------------------------------------------------------------------") logger.info("----------------------------------------------------------------------")
class StateTransitionTask(Task): class StateTransitionTask(Task):
LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
@classmethod @classmethod
def getInfo(cls): # each sub class should supply their own information def getInfo(cls): # each sub class should supply their own information
raise RuntimeError("Overriding method expected") raise RuntimeError("Overriding method expected")
...@@ -1060,6 +1358,10 @@ class StateTransitionTask(Task): ...@@ -1060,6 +1358,10 @@ class StateTransitionTask(Task):
# return state.getValue() in cls.getBeginStates() # return state.getValue() in cls.getBeginStates()
raise RuntimeError("must be overriden") raise RuntimeError("must be overriden")
@classmethod
def getRegTableName(cls, i):
return "db.reg_table_{}".format(i)
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
super().execute(wt) super().execute(wt)
...@@ -1073,7 +1375,7 @@ class TaskCreateDb(StateTransitionTask): ...@@ -1073,7 +1375,7 @@ class TaskCreateDb(StateTransitionTask):
return state.canCreateDb() return state.canCreateDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db") self.execWtSql(wt, "create database db")
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@classmethod @classmethod
...@@ -1085,7 +1387,7 @@ class TaskDropDb(StateTransitionTask): ...@@ -1085,7 +1387,7 @@ class TaskDropDb(StateTransitionTask):
return state.canDropDb() return state.canDropDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("drop database db") self.execWtSql(wt, "drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time())) logger.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask): class TaskCreateSuperTable(StateTransitionTask):
...@@ -1098,8 +1400,13 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1098,8 +1400,13 @@ class TaskCreateSuperTable(StateTransitionTask):
return state.canCreateFixedSuperTable() return state.canCreateFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName() if not wt.dbInUse(): # no DB yet, to the best of our knowledge
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) logger.debug("Skipping task, no DB yet")
return
tblName = self._dbManager.getFixedSuperTableName()
# wt.execSql("use db") # should always be in place
self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically # No need to create the regular tables, INSERT will do that automatically
...@@ -1114,16 +1421,16 @@ class TaskReadData(StateTransitionTask): ...@@ -1114,16 +1421,16 @@ class TaskReadData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbManager.getFixedSuperTableName() sTbName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn() self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
dbc.close() wt.getDbConn().close()
dbc.open() wt.getDbConn().open()
else: else:
rTables = dbc.getQueryResult() rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult()
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables : # regular tables for rTbName in rTables : # regular tables
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
# tdSql.query(" cars where tbname in ('carzero', 'carone')") # tdSql.query(" cars where tbname in ('carzero', 'carone')")
...@@ -1137,8 +1444,33 @@ class TaskDropSuperTable(StateTransitionTask): ...@@ -1137,8 +1444,33 @@ class TaskDropSuperTable(StateTransitionTask):
return state.canDropFixedSuperTable() return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName() # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
wt.execSql("drop table db.{}".format(tblName)) if Dice.throw(2) == 0 :
tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True
for i in tblSeq:
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
try:
self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme
if ( errno2 in [0x362]) : # mnode invalid table name
isSuccess = False
logger.debug("[DB] Acceptable error when dropping a table")
continue # try to delete next regular table
if (not tickOutput):
tickOutput = True # Print only one time
if isSuccess :
print("d", end="", flush=True)
else:
print("f", end="", flush=True)
# Drop the super table itself
tblName = self._dbManager.getFixedSuperTableName()
self.execWtSql(wt, "drop table db.{}".format(tblName))
class TaskAlterTags(StateTransitionTask): class TaskAlterTags(StateTransitionTask):
@classmethod @classmethod
...@@ -1153,20 +1485,18 @@ class TaskAlterTags(StateTransitionTask): ...@@ -1153,20 +1485,18 @@ class TaskAlterTags(StateTransitionTask):
tblName = self._dbManager.getFixedSuperTableName() tblName = self._dbManager.getFixedSuperTableName()
dice = Dice.throw(4) dice = Dice.throw(4)
if dice == 0 : if dice == 0 :
wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1 : elif dice == 1 :
wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2 : elif dice == 2 :
wt.execSql("alter table db.{} drop tag newTag".format(tblName)) sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3 else: # dice == 3
wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) sql = "alter table db.{} change tag extraTag newTag".format(tblName)
self.execWtSql(wt, sql)
class TaskAddData(StateTransitionTask): class TaskAddData(StateTransitionTask):
activeTable : Set[int] = set() # Track which table is being actively worked on activeTable : Set[int] = set() # Track which table is being actively worked on
LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
# We use these two files to record operations to DB, useful for power-off tests # We use these two files to record operations to DB, useful for power-off tests
fAddLogReady = None fAddLogReady = None
...@@ -1192,7 +1522,7 @@ class TaskAddData(StateTransitionTask): ...@@ -1192,7 +1522,7 @@ class TaskAddData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager ds = self._dbManager
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this # wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq) random.shuffle(tblSeq)
for i in tblSeq: for i in tblSeq:
...@@ -1202,10 +1532,10 @@ class TaskAddData(StateTransitionTask): ...@@ -1202,10 +1532,10 @@ class TaskAddData(StateTransitionTask):
print("x", end="", flush=True) print("x", end="", flush=True)
else: else:
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
# No need to shuffle data sequence, unless later we decide to do non-increment insertion # No need to shuffle data sequence, unless later we decide to do non-increment insertion
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
nextInt = ds.getNextInt() nextInt = ds.getNextInt()
regTableName = "db.reg_table_{}".format(i)
if gConfig.record_ops: if gConfig.record_ops:
self.prepToRecordOps() self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
...@@ -1216,7 +1546,9 @@ class TaskAddData(StateTransitionTask): ...@@ -1216,7 +1546,9 @@ class TaskAddData(StateTransitionTask):
ds.getFixedSuperTableName(), ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(), ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt) ds.getNextTick(), nextInt)
wt.execSql(sql) self.execWtSql(wt, sql)
# Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt)
if gConfig.record_ops: if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush() self.fAddLogDone.flush()
...@@ -1285,17 +1617,213 @@ class LoggingFilter(logging.Filter): ...@@ -1285,17 +1617,213 @@ class LoggingFilter(logging.Filter):
if ( record.levelno >= logging.INFO ) : if ( record.levelno >= logging.INFO ) :
return True # info or above always log return True # info or above always log
msg = record.msg
# print("type = {}, value={}".format(type(msg), msg))
# sys.exit()
# Commenting out below to adjust... # Commenting out below to adjust...
# if msg.startswith("[TRD]"): # if msg.startswith("[TRD]"):
# return False # return False
return True return True
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
self.ioThread = None
self.subProcess = None
self.shouldStop = False
self.status = MainExec.STATUS_RUNNING
def svcOutputReader(self, out: IO, queue):
# print("This is the svcOutput Reader...")
for line in out : # iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
queue.put(line.rstrip()) # get rid of new lines
print("No more output from incoming IO") # meaning sub process must have died
out.close()
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
self.subProcess.send_signal(signal.SIGINT)
self.shouldStop = True
self.joinIoThread()
def joinIoThread(self):
if self.ioThread :
self.ioThread.join()
self.ioThread = None
def run(self):
ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1']
self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True)
q = Queue()
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start()
# proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE,
# )
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
while True :
try:
line = q.get_nowait() # getting output at fast speed
except Empty:
# print('no output yet')
time.sleep(2.3) # wait only if there's no output
else: # got line
print(line)
# print("----end of iteration----")
if self.shouldStop:
print("Ending main Svc thread")
break
print("end of loop")
self.joinIoThread()
print("Finished")
class ClientManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
self.status = MainExec.STATUS_RUNNING
self.tc = None
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
self.tc.requestToStop()
def _printLastNumbers(self): # to verify data durability
dbManager = DbManager(resetDb=False)
dbc = dbManager.getDbConn()
if dbc.query("show databases") == 0 : # no databae
return
if dbc.query("show tables") == 0 : # no tables
return
dbc.execute("use db")
sTbName = dbManager.getFixedSuperTableName()
# get all regular tables
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
rTables = dbc.getQueryResult()
bList = TaskExecutor.BoundedList()
for rTbName in rTables : # regular tables
dbc.query("select speed from db.{}".format(rTbName[0]))
numbers = dbc.getQueryResult()
for row in numbers :
# print("<{}>".format(n), end="", flush=True)
bList.add(row[0])
print("Top numbers in DB right now: {}".format(bList))
print("TDengine client execution is about to start in 2 seconds...")
time.sleep(2.0)
dbManager = None # release?
def prepare(self):
self._printLastNumbers()
def run(self):
self._printLastNumbers()
dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
def conclude(self):
self.tc.printStats()
self.tc.getDbManager().cleanUp()
class MainExec:
STATUS_RUNNING = 1
STATUS_STOPPING = 2
# STATUS_STOPPED = 3 # Not used yet
@classmethod
def runClient(cls):
clientManager = ClientManager()
return clientManager.run()
@classmethod
def runService(cls):
svcManager = SvcManager()
svcManager.run()
@classmethod
def runTemp(cls): # for debugging purposes
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
# dbc.execute("create database if not exists db")
# if not dbState.getState().equals(StateEmpty()):
# dbc.execute("use db")
# rTables = None
# try: # the super table may not exist
# sql = "select TBNAME from db.{}".format(sTbName)
# logger.info("Finding out tables in super table: {}".format(sql))
# dbc.query(sql) # TODO: analyze result set later
# logger.info("Fetching result")
# rTables = dbc.getQueryResult()
# logger.info("Result: {}".format(rTables))
# except taos.error.ProgrammingError as err:
# logger.info("Initial Super table OPS error: {}".format(err))
# # sys.exit()
# if ( not rTables == None):
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
# try:
# for rTbName in rTables : # regular tables
# ds = dbState
# logger.info("Inserting into table: {}".format(rTbName[0]))
# sql = "insert into db.{} values ('{}', {});".format(
# rTbName[0],
# ds.getNextTick(), ds.getNextInt())
# dbc.execute(sql)
# for rTbName in rTables : # regular tables
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# logger.info("Initial READING operation is successful")
# except taos.error.ProgrammingError as err:
# logger.info("Initial WRITE/READ error: {}".format(err))
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:
# rows = dbc.query("show databases")
# print("Rows: {}, time={}".format(rows, time.time()))
return
def main(): def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
...@@ -1308,93 +1836,55 @@ def main(): ...@@ -1308,93 +1836,55 @@ def main():
2. You run the server there before this script: ./build/bin/taosd -c test/cfg 2. You run the server there before this script: ./build/bin/taosd -c test/cfg
''')) '''))
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
help='Connector type to use: native, rest, or mixed (default: 10)')
parser.add_argument('-d', '--debug', action='store_true', parser.add_argument('-d', '--debug', action='store_true',
help='Turn on DEBUG mode for more logging (default: false)') help='Turn on DEBUG mode for more logging (default: false)')
parser.add_argument('-e', '--run-tdengine', action='store_true',
help='Run TDengine service in foreground (default: false)')
parser.add_argument('-l', '--larger-data', action='store_true', parser.add_argument('-l', '--larger-data', action='store_true',
help='Write larger amount of data during write operations (default: false)') help='Write larger amount of data during write operations (default: false)')
parser.add_argument('-p', '--per-thread-db-connection', action='store_true', parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
help='Use a single shared db connection (default: false)') help='Use a single shared db connection (default: false)')
parser.add_argument('-r', '--record-ops', action='store_true', parser.add_argument('-r', '--record-ops', action='store_true',
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
help='Maximum number of steps to run (default: 100)') help='Maximum number of steps to run (default: 100)')
parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
help='Number of threads to run (default: 10)') help='Number of threads to run (default: 10)')
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
sys.exit()
# if len(sys.argv) == 1:
# parser.print_help()
# sys.exit()
# Logging Stuff
global logger global logger
logger = logging.getLogger('CrashGen') _logger = logging.getLogger('CrashGen') # real logger
logger.addFilter(LoggingFilter()) _logger.addFilter(LoggingFilter())
ch = logging.StreamHandler()
_logger.addHandler(ch)
logger = MyLoggingAdapter(_logger, []) # Logging adapter, to be used as a logger
if ( gConfig.debug ): if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO logger.setLevel(logging.DEBUG) # default seems to be INFO
else: else:
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)
# resetDb = False # DEBUG only
# dbState = DbState(resetDb) # DBEUG only!
dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator(
ThreadPool(gConfig.num_threads, gConfig.max_steps),
# WorkDispatcher(dbState), # Obsolete?
dbManager
)
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
# dbc.execute("create database if not exists db")
# if not dbState.getState().equals(StateEmpty()):
# dbc.execute("use db")
# rTables = None
# try: # the super table may not exist
# sql = "select TBNAME from db.{}".format(sTbName)
# logger.info("Finding out tables in super table: {}".format(sql))
# dbc.query(sql) # TODO: analyze result set later
# logger.info("Fetching result")
# rTables = dbc.getQueryResult()
# logger.info("Result: {}".format(rTables))
# except taos.error.ProgrammingError as err:
# logger.info("Initial Super table OPS error: {}".format(err))
# # sys.exit()
# if ( not rTables == None):
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
# try:
# for rTbName in rTables : # regular tables
# ds = dbState
# logger.info("Inserting into table: {}".format(rTbName[0]))
# sql = "insert into db.{} values ('{}', {});".format(
# rTbName[0],
# ds.getNextTick(), ds.getNextInt())
# dbc.execute(sql)
# for rTbName in rTables : # regular tables
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# logger.info("Initial READING operation is successful")
# except taos.error.ProgrammingError as err:
# logger.info("Initial WRITE/READ error: {}".format(err))
# Run server or client
if gConfig.run_tdengine : # run server
MainExec.runService()
else :
return MainExec.runClient()
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:
# rows = dbc.query("show databases")
# print("Rows: {}, time={}".format(rows, time.time()))
tc.run()
tc.logStats()
dbManager.cleanUp()
# logger.info("Crash_Gen execution finished") # logger.info("Crash_Gen execution finished")
if __name__ == "__main__": if __name__ == "__main__":
main() exitCode = main()
# print("Exiting with code: {}".format(exitCode))
sys.exit(exitCode)
...@@ -38,4 +38,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 ...@@ -38,4 +38,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib
# Now we are all let, and let's see if we can find a crash. Note we pass all params # Now we are all let, and let's see if we can find a crash. Note we pass all params
./crash_gen.py $@ python3 ./crash_gen.py $@
sql connect
$db = db1
$stb = stb1
print =============== client1_0:
sql use $db
$tblNum = 1000
$i = 1
while $i < $tblNum
$tb = tb . $i
sql create table $tb using $stb tags ($i, 'abcd')
$i = $i + 1
endw
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode2 -c walLevel -v 1
system sh/cfg.sh -n dnode3 -c walLevel -v 1
system sh/cfg.sh -n dnode4 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c alternativeRole -v 0
system sh/cfg.sh -n dnode2 -c alternativeRole -v 0
system sh/cfg.sh -n dnode3 -c alternativeRole -v 0
system sh/cfg.sh -n dnode4 -c alternativeRole -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1/dnode2/dnode3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sleep 3000
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
sleep 3000
print ============== step2: create db1 with replica 3
$db = db1
print create database $db replica 3
#sql create database $db replica 3 maxTables $totalTableNum
sql create database $db replica 3
sql use $db
print ============== step3: create stable stb1
$stb = stb1
sql create table $stb (ts timestamp, c1 int, c2 int) tags(t1 int, t2 binary(8))
print ============== step4: start 10 client1/ 10 client2/ 10 client3/ 10 client4/ 1 client5
run_back unique/cluster/client1_0.sim
#run_back unique/cluster/client1_1.sim
#run_back unique/big_cluster/client1_2.sim
#run_back unique/big_cluster/client1_3.sim
#run_back unique/big_cluster/client1_4.sim
#run_back unique/big_cluster/client1_5.sim
#run_back unique/big_cluster/client1_6.sim
#run_back unique/big_cluster/client1_7.sim
#run_back unique/big_cluster/client1_8.sim
#run_back unique/big_cluster/client1_9.sim
print wait for a while to let clients start insert data
sleep 5000
$loop_cnt = 0
loop_cluster_do:
print **** **** **** START loop cluster do **** **** **** ****
print ============== step5: start dnode4 and add into cluster, then wait dnode4 ready
system sh/exec.sh -n dnode4 -s start
sql create dnode $hostname4
wait_dnode4_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 4 then
sleep 2000
goto wait_dnode4_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do 2**** **** **** ****
return
endi
if $dnode4Status != ready then
sleep 2000
goto wait_dnode4_ready_0
endi
print ============== step6: stop and drop dnode1, then remove data dir of dnode1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
$cnt = 0
wait_dnode1_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 4 then
sleep 2000
goto wait_dnode1_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode1Status = $data4_1
elif $loop_cnt == 1 then
$dnode1Status = $data4_5
elif $loop_cnt == 2 then
$dnode1Status = $data4_7
elif $loop_cnt == 3 then
$dnode1Status = $data4_9
else then
print **** **** **** END loop cluster do 1**** **** **** ****
return
endi
if $dnode1Status != offline then
sleep 2000
goto wait_dnode1_offline_0
endi
sql drop dnode $hostname1
system rm -rf ../../../sim/dnode1
print ============== step7: stop dnode2, because mnodes < 50%, so clusert don't provide services
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sql show dnodes -x wait_dnode2_offline_0
if $rows != 3 then
sleep 2000
goto wait_dnode2_offline_0
endi
wait_dnode2_offline_0:
#$cnt = 0
#wait_dnode2_offline_0:
#$cnt = $cnt + 1
#if $cnt == 10 then
# return -1
#endi
#sql show dnodes -x wait_dnode2_offline_0
#if $rows != 3 then
# sleep 2000
# goto wait_dnode2_offline_0
#endi
#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#$dnode1Status = $data4_1
#$dnode2Status = $data4_2
#$dnode3Status = $data4_3
#$dnode4Status = $data4_4
#
#if $dnode2Status != offline then
# sleep 2000
# goto wait_dnode1_offline_0
#endi
print ============== step8: restart dnode2, then wait sync end
system sh/exec.sh -n dnode2 -s start
$cnt = 0
wait_dnode2_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode2_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode2Status != ready then
sleep 2000
goto wait_dnode2_ready_0
endi
print ============== step9: stop dnode3, then wait sync end
system sh/exec.sh -n dnode3 -s stop -x SIGINT
$cnt = 0
wait_dnode3_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode3_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode3Status != offline then
sleep 2000
goto wait_dnode3_offline_0
endi
print ============== step10: restart dnode3, then wait sync end
system sh/exec.sh -n dnode3 -s start
$cnt = 0
wait_dnode3_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode3_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode3Status != ready then
sleep 2000
goto wait_dnode3_ready_0
endi
print ============== step11: stop dnode4, then wait sync end
system sh/exec.sh -n dnode4 -s stop -x SIGINT
$cnt = 0
wait_dnode4_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode4_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do 2**** **** **** ****
return
endi
if $dnode4Status != offline then
sleep 2000
goto wait_dnode4_offline_0
endi
print ============== step12: restart dnode4, then wait sync end
system sh/exec.sh -n dnode4 -s start
$cnt = 0
wait_dnode4_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode4_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do 2**** **** **** ****
return
endi
if $dnode4Status != ready then
sleep 2000
goto wait_dnode4_ready_0
endi
print ============== step13: alter replica 2
sql alter database $db replica 2
sql show database
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
if $data0_5 != 2 then
print rplica is not modify to 2, error!!!!!!
return
endi
print ============== step14: stop and drop dnode4, then remove data dir of dnode4
system sh/exec.sh -n dnode4 -s stop -x SIGINT
$cnt = 0
wait_dnode4_offline_1:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode4_offline_1
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do 2**** **** **** ****
return
endi
if $dnode4Status != offline then
sleep 2000
goto wait_dnode4_offline_1
endi
sql drop dnode $hostname4
system rm -rf ../../../sim/dnode4
print ============== step15: alter replica 1
sql alter database $db replica 1
sql show database
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
if $data0_5 != 1 then
print rplica is not modify to 1, error!!!!!!
return
endi
print ============== step16: alter replica 2
sql alter database $db replica 1
sql show database
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
if $data0_5 != 2 then
print rplica is not modify to 2, error!!!!!!
return
endi
print ============== step17: start dnode1 and add into cluster, then wait dnode1 ready
system sh/exec.sh -n dnode1 -s start
sql create dnode $hostname1
wait_dnode1_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode1_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode1Status = $data4_1
elif $loop_cnt == 1 then
$dnode1Status = $data4_5
elif $loop_cnt == 2 then
$dnode1Status = $data4_7
elif $loop_cnt == 3 then
$dnode1Status = $data4_9
else then
print **** **** **** END loop cluster do 3**** **** **** ****
return
endi
if $dnode1Status != ready then
sleep 2000
goto wait_dnode1_ready_0
endi
print ============== step18: alter replica 3
sql alter database $db replica 3
sql show database
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
if $data0_5 != 3 then
print rplica is not modify to 3, error!!!!!!
return
endi
$loop_cnt = $loop_cnt + 1
goto loop_cluster_do
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册