提交 15f0e77c 编写于 作者: S Shengliang Guan

[TD-570] fix wrong response handle when create child tables

上级 99ebc7e0
......@@ -118,6 +118,8 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
mnodeCreateMsg(pWrite, pMsg);
dTrace("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
......@@ -147,19 +149,21 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
}
static void *dnodeProcessMnodeWriteQueue(void *param) {
SMnodeMsg *pWriteMsg;
SMnodeMsg *pWrite;
int32_t type;
void * unUsed;
while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWriteMsg, &unUsed) == 0) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dTrace("dnodeProcessMnodeWriteQueue: got no message from qset, exiting...");
break;
}
dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWriteMsg);
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
dTrace("app:%p:%p, msg:%s will be processed in mwrite queue", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWrite);
dnodeSendRpcMnodeWriteRsp(pWrite, code);
}
return NULL;
......@@ -169,9 +173,15 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg;
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dTrace("app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMnodeWriteMsg(pWrite);
} else {
} else {
dTrace("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
}
......
......@@ -641,6 +641,11 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
pHead->len = pOper->rowSize;
memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) {
sdbTrace("app:%p:%p, insert action is add to write queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
}
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
}
......@@ -687,7 +692,6 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE;
}
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize;
SSdbOper *pNewOper = taosAllocateQitem(size);
......@@ -698,6 +702,11 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
memcpy(pHead->cont, key, keySize);
memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) {
sdbTrace("app:%p:%p, delete action is add to write queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
}
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
}
......@@ -740,7 +749,12 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
(*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize;
memcpy(pNewOper, pOper, sizeof(SSdbOper));
memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) {
sdbTrace("app:%p:%p, update action is add to write queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
}
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
}
......@@ -961,6 +975,10 @@ static void *sdbWorkerFp(void *param) {
pOper = NULL;
}
if (pOper != NULL && pOper->pMsg != NULL) {
sdbTrace("app:%p:%p, will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg);
}
int32_t code = sdbWrite(pOper, pHead, type);
if (pOper) pOper->retCode = code;
}
......@@ -976,6 +994,12 @@ static void *sdbWorkerFp(void *param) {
if (pOper->cb) {
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
}
if (pOper != NULL && pOper->pMsg != NULL) {
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
tstrerror(pOper->retCode));
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
}
taosFreeQitem(item);
......
......@@ -688,10 +688,12 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
}
if (pCreate->numOfTags != 0) {
mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
mTrace("app:%p:%p, table:%s, create stable msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateSuperTableMsg(pMsg);
} else {
mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
mTrace("app:%p:%p, table:%s, create ctable msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateChildTableMsg(pMsg);
}
}
......@@ -1466,14 +1468,13 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static SChildTableObj* mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
SVgObj *pVgroup = pMsg->pVgroup;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj));
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
if (pCreate->numOfColumns == 0) {
......@@ -1493,8 +1494,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name);
mnodeDestroyChildTable(pTable);
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME;
return NULL;
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
mnodeDecTableRef(pSuperTable);
......@@ -1513,8 +1513,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
......@@ -1530,15 +1529,17 @@ static SChildTableObj* mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pTable->sql = calloc(1, pTable->sqlLen);
if (pTable->sql == NULL) {
free(pTable);
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
pTable->sql[pTable->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pTable->info.tableId, pTable->sqlLen, pTable->sql);
}
}
pMsg->pTable = (STableObj *)pTable;
mnodeIncTableRef(pMsg->pTable);
SSdbOper desc = {0};
desc.type = SDB_OPER_GLOBAL;
desc.pObj = pTable;
......@@ -1550,12 +1551,12 @@ static SChildTableObj* mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
if (code != TSDB_CODE_SUCCESS) {
free(pTable);
mError("table:%s, update sdb error", pCreate->tableId);
terrno = TSDB_CODE_MND_SDB_ERROR;
return NULL;
pMsg->pTable = NULL;
return code;
} else {
mTrace("table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pTable->info.tableId, pVgroup->vgId, pTable->sid,
pTable->uid);
return pTable;
return TSDB_CODE_SUCCESS;
}
}
......@@ -1563,13 +1564,14 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create, grant timeseries failed", pCreate->tableId);
mError("app:%p:%p, table:%s, failed to create, grant timeseries failed", pMsg->rpcMsg.ahandle, pMsg,
pCreate->tableId);
return code;
}
SVgObj *pVgroup = mnodeGetAvailableVgroup(pMsg->pDb);
if (pVgroup == NULL) {
mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
mTrace("app:%p:%p, table:%s, start to create a new vgroup", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return mnodeCreateVgroup(pMsg, pMsg->pDb);
}
......@@ -1577,7 +1579,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId);
mTrace("app:%p:%p, table:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
pVgroup->vgId);
return mnodeCreateVgroup(pMsg, pMsg->pDb);
}
......@@ -1586,21 +1589,27 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
mnodeIncVgroupRef(pVgroup);
}
pMsg->pTable = (STableObj *)mnodeDoCreateChildTable(pMsg, sid);
if (pMsg->pTable == NULL) {
return terrno;
}
mTrace("app:%p:%p, table:%s, create table in vgroup, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
pVgroup->vgId, sid);
mnodeIncTableRef(pMsg->pTable);
code = mnodeDoCreateChildTable(pMsg, sid);
if (code != TSDB_CODE_SUCCESS) {
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
} else {
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId);
}
if (pMsg->pTable == NULL) {
mError("app:%p:%p, table:%s, object not found, retry:%d reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
tstrerror(terrno));
return terrno;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
mTrace("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return mnodeDoCreateChildTableCb(pMsg, TSDB_CODE_SUCCESS);
}
}
......@@ -2007,8 +2016,10 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
}
// handle create table response from dnode
// if failed, drop the table cached
/*
* handle create table response from dnode
* if failed, drop the table cached
*/
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return;
......@@ -2017,18 +2028,18 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable);
mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle,
tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
if (rpcMsg->code != TSDB_CODE_SUCCESS && rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
if (mnodeMsg->retry++ < 10) {
mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId,
mnodeMsg->retry, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
mTrace("app:%p:%p, table:%s, create table rsp received, need retry, times:%d result:%s thandle:%p",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry,tstrerror(rpcMsg->code),
mnodeMsg->rpcMsg.handle);
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
} else {
mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId,
mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
mError("app:%p:%p, table:%s, failed to create in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb,
......@@ -2037,16 +2048,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
sdbDeleteRow(&oper);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
}
}
} else {
mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle,
tstrerror(rpcMsg->code));
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont;
if (pCreate->getMeta) {
mTrace("table:%s, continue to get meta", pTable->info.tableId);
mTrace("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p", mnodeMsg->rpcMsg.ahandle,
mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle);
mnodeMsg->retry = 0;
dnodeReprocessMnodeWriteMsg(mnodeMsg);
} else {
mTrace("app:%p:%p, table:%s, created in dnode, thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, mnodeMsg->rpcMsg.handle);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
}
}
......
......@@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mwrite queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
mError("app:%p:%p, msg:%s content is null", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN;
}
......@@ -54,27 +54,31 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse);
mTrace("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
mTrace("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i],
htons(ipSet->port[i]));
}
return TSDB_CODE_RPC_REDIRECT;
}
if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mwrite queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
mError("app:%p:%p, msg:%s not processed", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
}
int32_t code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code));
mError("app:%p:%p, msg:%s not processed, reason:%s", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code));
return code;
}
if (!pMsg->pUser->writeAuth) {
mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
mError("app:%p:%p, msg:%s not processed, no write auth", pMsg->rpcMsg.ahandle, pMsg,
taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_NO_RIGHTS;
}
......
......@@ -333,28 +333,28 @@ cd ../../../debug; make
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_nomaster.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_nomaster.sim
./test.sh -f unique/arbitrator/dn3_mn2_killDnode.sim
./test.sh -f unique/arbitrator/insert_duplicationTs.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica2_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica3_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropTable_online.sim
./test.sh -f unique/arbitrator/replica_changeWithArbitrator.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
#./test.sh -f unique/arbitrator/insert_duplicationTs.sim
#./test.sh -f unique/arbitrator/offline_replica2_alterTable_online.sim
#./test.sh -f unique/arbitrator/offline_replica2_alterTag_online.sim
#./test.sh -f unique/arbitrator/offline_replica2_createTable_online.sim
#./test.sh -f unique/arbitrator/offline_replica2_dropDb_online.sim
#./test.sh -f unique/arbitrator/offline_replica2_dropTable_online.sim
#./test.sh -f unique/arbitrator/offline_replica3_alterTable_online.sim
#./test.sh -f unique/arbitrator/offline_replica3_alterTag_online.sim
#./test.sh -f unique/arbitrator/offline_replica3_createTable_online.sim
#./test.sh -f unique/arbitrator/offline_replica3_dropDb_online.sim
#./test.sh -f unique/arbitrator/offline_replica3_dropTable_online.sim
#./test.sh -f unique/arbitrator/replica_changeWithArbitrator.sim
#./test.sh -f unique/arbitrator/sync_replica2_alterTable_add.sim
#./test.sh -f unique/arbitrator/sync_replica2_alterTable_drop.sim
#./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim
#./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim
#./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim
#./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
#./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
#./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
......@@ -33,6 +33,7 @@ typedef struct {
int threadIndex;
char dbName[32];
char stableName[64];
float createTableSpeed;
pthread_t thread;
} SInfo;
......@@ -49,8 +50,8 @@ int64_t numOfThreads = 1;
int64_t numOfTablesPerThread = 200;
char dbName[32] = "db";
char stableName[64] = "st";
int32_t cache = 16384;
int32_t tables = 1000;
int32_t cache = 16;
int32_t tables = 5000;
int main(int argc, char *argv[]) {
shellParseArgument(argc, argv);
......@@ -63,9 +64,8 @@ int main(int argc, char *argv[]) {
void createDbAndTable() {
pPrint("start to create table");
TAOS_RES * pSql;
TAOS * con;
struct timeval systemTime;
int64_t st, et;
char qstr[64000];
char fqdn[TSDB_FQDN_LEN];
......@@ -77,22 +77,24 @@ void createDbAndTable() {
exit(1);
}
sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables);
if (taos_query(con, qstr)) {
pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con));
sprintf(qstr, "create database if not exists %s cache %d maxtables %d", dbName, cache, tables);
pSql = taos_query(con, qstr);
int32_t code = taos_errno(pSql);
if (code != 0) {
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con));
exit(0);
}
taos_free_result(pSql);
sprintf(qstr, "use %s", dbName);
if (taos_query(con, qstr)) {
pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
exit(0);
}
taos_free_result(pSql);
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
int64_t totalTables = numOfTablesPerThread * numOfThreads;
if (strcmp(stableName, "no") != 0) {
int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName);
for (int64_t f = 0; f < pointsPerTable; ++f) {
......@@ -100,36 +102,14 @@ void createDbAndTable() {
}
sprintf(qstr + len, ") tags(t int)");
if (taos_query(con, qstr)) {
pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
exit(0);
}
for (int64_t t = 0; t < totalTables; ++t) {
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
if (taos_query(con, qstr)) {
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
}
} else {
for (int64_t t = 0; t < totalTables; ++t) {
int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t);
for (int64_t f = 0; f < pointsPerTable; ++f) {
len += sprintf(qstr + len, ", f%ld double", f);
}
sprintf(qstr + len, ")");
if (taos_query(con, qstr)) {
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
}
taos_free_result(pSql);
}
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
pPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables);
}
void insertData() {
......@@ -144,7 +124,7 @@ void insertData() {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SInfo *pInfo = (SInfo *)malloc(sizeof(SInfo) * numOfThreads);
SInfo *pInfo = (SInfo *)calloc(numOfThreads, sizeof(SInfo));
// Start threads to write
for (int i = 0; i < numOfThreads; ++i) {
......@@ -173,10 +153,15 @@ void insertData() {
double speedOfRows = totalRows / seconds;
double speedOfPoints = totalPoints / seconds;
float createTableSpeed = 0;
for (int i = 0; i < numOfThreads; ++i) {
createTableSpeed += pInfo[i].createTableSpeed;
}
pPrint(
"%sall threads:%ld finished, use %.1lf seconds, tables:%.ld rows:%ld points:%ld, speed RowsPerSecond:%.1lf "
"PointsPerSecond:%.1lf%s",
GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, NC);
"PointsPerSecond:%.1lf CreateTableSpeed:%.1f t/s %s",
GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, createTableSpeed, NC);
pPrint("threads exit");
......@@ -191,6 +176,7 @@ void *syncTest(void *param) {
int64_t st, et;
char qstr[65000];
int maxBytes = 60000;
int code;
pPrint("thread:%d, start to run", pInfo->threadIndex);
......@@ -210,6 +196,48 @@ void *syncTest(void *param) {
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
if (strcmp(stableName, "no") != 0) {
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
taos_free_result(pSql);
}
} else {
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t);
for (int64_t f = 0; f < pointsPerTable; ++f) {
len += sprintf(qstr + len, ", f%ld double", f);
}
sprintf(qstr + len, ")");
TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
taos_free_result(pSql);
}
}
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
float seconds = (et - st) / 1000.0 / 1000.0;
int64_t tables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
pInfo->createTableSpeed = (float)tables / seconds;
pPrint("thread:%d, %.1f seconds to create %ld tables, speed:%.1f", pInfo->threadIndex, seconds, tables,
pInfo->createTableSpeed);
if (pInfo->rowsPerTable == 0) return NULL;
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
int64_t start = 1430000000000;
int64_t interval = 1000; // 1000 ms
......@@ -227,10 +255,13 @@ void *syncTest(void *param) {
}
len += sprintf(sql + len, ")");
if (len > maxBytes) {
if (taos_query(con, qstr)) {
TAOS_RES *pSql = taos_query(con, qstr);
int32_t code = taos_errno(pSql);
if (code != 0) {
pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
table, row, taos_errstr(con));
}
taos_free_result(pSql);
// "insert into"
len = sprintf(sql, "%s", inserStr);
......@@ -239,7 +270,8 @@ void *syncTest(void *param) {
}
if (len != strlen(inserStr)) {
taos_query(con, qstr);
TAOS_RES *pSql = taos_query(con, qstr);
taos_free_result(pSql);
}
gettimeofday(&systemTime, NULL);
......
......@@ -49,8 +49,8 @@ int64_t numOfThreads = 1;
int64_t numOfTablesPerThread = 1;
char dbName[32] = "db";
char stableName[64] = "st";
int32_t cache = 16384;
int32_t tables = 1000;
int32_t cache = 16;
int32_t tables = 5000;
int main(int argc, char *argv[]) {
shellParseArgument(argc, argv);
......@@ -63,6 +63,7 @@ int main(int argc, char *argv[]) {
void createDbAndTable() {
pPrint("start to create table");
TAOS_RES * pSql;
TAOS * con;
struct timeval systemTime;
int64_t st, et;
......@@ -79,17 +80,22 @@ void createDbAndTable() {
exit(1);
}
sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables);
if (taos_query(con, qstr)) {
pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con));
sprintf(qstr, "create database if not exists %s cache %d maxtables %d", dbName, cache, tables);
pSql = taos_query(con, qstr);
int32_t code = taos_errno(pSql);
if (code != 0) {
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con));
exit(0);
}
sprintf(qstr, "use %s", dbName);
if (taos_query(con, qstr)) {
pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
exit(0);
}
taos_stop_query(pSql);
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
......@@ -102,17 +108,23 @@ void createDbAndTable() {
}
sprintf(qstr + len, ") tags(t int)");
if (taos_query(con, qstr)) {
pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
exit(0);
}
taos_stop_query(pSql);
for (int64_t t = 0; t < totalTables; ++t) {
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
if (taos_query(con, qstr)) {
pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
taos_stop_query(pSql);
}
} else {
for (int64_t t = 0; t < totalTables; ++t) {
......@@ -122,16 +134,20 @@ void createDbAndTable() {
}
sprintf(qstr + len, ")");
if (taos_query(con, qstr)) {
pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
taos_stop_query(pSql);
}
}
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
pPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables);
float seconds = (et - st) / 1000.0 / 1000.0;
pPrint("%.1f seconds to create %ld tables, speed:%.1f", seconds, totalTables, totalTables / seconds);
}
void insertData() {
......@@ -141,7 +157,12 @@ void insertData() {
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
pPrint("%d threads are spawned to insert data", numOfThreads);
if (rowsPerTable <= 0) {
pPrint("not insert data for rowsPerTable is :%d", rowsPerTable);
exit(0);
} else {
pPrint("%d threads are spawned to insert data", numOfThreads);
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
......@@ -230,10 +251,13 @@ void *syncTest(void *param) {
}
len += sprintf(sql + len, ")");
if (len > maxBytes) {
if (taos_query(con, qstr)) {
TAOS_RES *pSql = taos_query(con, qstr);
int32_t code = taos_errno(pSql);
if (code != 0) {
pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
table, row, taos_errstr(con));
}
taos_stop_query(pSql);
// "insert into"
len = sprintf(sql, "%s", inserStr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册