提交 344c0655 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

......@@ -317,6 +317,7 @@ typedef struct SSqlObj {
SRpcIpSet ipList;
char freed : 4;
char listed : 4;
uint32_t insertType;
tsem_t rspSem;
SSqlCmd cmd;
SSqlRes res;
......@@ -402,6 +403,7 @@ void tscCloseTscObj(STscObj *pObj);
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
......
......@@ -482,6 +482,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
} else {
code = tsParseSql(pSql, false);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
(*pSql->fp)(pSql->param, NULL, code);
return;
}
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
}
}
......
......@@ -1312,6 +1312,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pSql->insertType);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) {
......@@ -1339,7 +1340,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
* the error handle callback function can rightfully restore the user-defined callback function (fp).
*/
if (initialParse) {
if (initialParse && (pSql->insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
......
......@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taos.h"
#include "tsclient.h"
......@@ -20,6 +21,7 @@
#include "taosmsg.h"
#include "tstrbuild.h"
#include "tscLog.h"
#include "tscSubquery.h"
int tsParseInsertSql(SSqlObj *pSql);
int taos_query_imp(STscObj* pObj, SSqlObj* pSql);
......@@ -262,7 +264,11 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data, param->type, param->bytes);
if (param->type == TSDB_DATA_TYPE_BINARY || param->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(data + param->offset, param->type);
} else {
setNull(data + param->offset, param->type, param->bytes);
}
return TSDB_CODE_SUCCESS;
}
......@@ -297,14 +303,17 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
return TSDB_CODE_INVALID_VALUE;
}
size = (short)*bind->length;
break;
STR_WITH_SIZE_TO_VARSTR(data + param->offset, bind->buffer, size);
return TSDB_CODE_SUCCESS;
case TSDB_DATA_TYPE_NCHAR:
if (!taosMbsToUcs4(bind->buffer, *bind->length, data + param->offset, param->bytes, NULL)) {
case TSDB_DATA_TYPE_NCHAR: {
size_t output = 0;
if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_INVALID_VALUE;
}
}
varDataSetLen(data + param->offset, output);
return TSDB_CODE_SUCCESS;
}
default:
assert(false);
return TSDB_CODE_INVALID_VALUE;
......@@ -383,14 +392,6 @@ static int insertStmtAddBatch(STscStmt* stmt) {
return TSDB_CODE_SUCCESS;
}
static int insertStmtPrepare(STscStmt* stmt) {
SSqlObj *pSql = stmt->pSql;
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
return tsParseInsertSql(pSql);
}
static int insertStmtReset(STscStmt* pStmt) {
SSqlCmd* pCmd = &pStmt->pSql->cmd;
if (pCmd->batchSize > 2) {
......@@ -451,14 +452,16 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->qhandle = 0;
pSql->insertType = 0;
pSql->fetchFp = waitForQueryRsp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
tscDoQuery(pSql);
// tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscPartiallyFreeSqlObj(pSql);
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
return pSql->res.code;
return pRes->code;
}
////////////////////////////////////////////////////////////////////////////////
......@@ -478,6 +481,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
tscError("failed to allocate memory for statement");
return NULL;
}
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
......@@ -488,8 +492,10 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
}
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pTscObj->pSql = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pStmt->pSql = pSql;
return pStmt;
......@@ -497,22 +503,55 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (length == 0) {
length = strlen(sql);
if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
terrno = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED;
}
SSqlObj* pSql = pStmt->pSql;
size_t sqlLen = strlen(sql);
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*)pStmt->taos;
pSql->fp = waitForQueryRsp;
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p failed to malloc payload buffer", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
char* sqlstr = (char*)malloc(length + 1);
if (sqlstr == NULL) {
tscError("failed to malloc sql string buffer");
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
free(pCmd->payload);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
memcpy(sqlstr, sql, length);
sqlstr[length] = 0;
strtolower(sqlstr, sqlstr);
pRes->qhandle = 0;
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
pStmt->pSql->sqlstr = sqlstr;
if (tscIsInsertData(sqlstr)) {
if (tscIsInsertData(pSql->sqlstr)) {
pStmt->isInsert = true;
return insertStmtPrepare(pStmt);
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
return pSql->res.code;
}
return code;
}
pStmt->isInsert = false;
......@@ -574,7 +613,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
} else {
tfree(pStmt->pSql->sqlstr);
pStmt->pSql->sqlstr = sql;
ret = taos_query_imp(pStmt->taos, pStmt->pSql);
ret = taos_query(pStmt->taos, pStmt->pSql->sqlstr);
}
}
return ret;
......
......@@ -264,7 +264,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
return pRes->code;
}
static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((STscObj *)param)->pSql;
......
......@@ -331,6 +331,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
......
......@@ -117,7 +117,7 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
// the TSDB repository info
typedef struct STsdbRepoInfo {
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#define TAOS_SYNC_MAX_REPLICA 5
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_OFFLINE,
......@@ -53,11 +54,16 @@ typedef struct {
uint32_t nodeId[TAOS_SYNC_MAX_REPLICA];
int role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole;
// if name is empty(name[0] is zero), get the file from index or after, used by master
// if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node
// it returns the file magic number and size, if file not there, magic shall be 0.
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
/*
if name is empty(name[0] is zero), get the file from index or after, but not larger than eindex. If a file
is found between index and eindex, index shall be updated, name shall be set, size shall be set to
file size, and file magic number shall be returned.
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
......
......@@ -200,7 +200,7 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeIpSet();
}
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) {
sdbUpdateMnodeRoles();
return 0;
}
......
......@@ -508,6 +508,8 @@ void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
int compFGroupKey(const void *key, const void *fgroup);
#ifdef __cplusplus
}
#endif
......
......@@ -35,7 +35,6 @@ const char *tsdbFileSuffix[] = {
".last" // TSDB_FILE_TYPE_LAST
};
static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
......@@ -285,7 +284,7 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
return 0;
}
static int compFGroupKey(const void *key, const void *fgroup) {
int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
if (fid == pFGroup->fileId) {
......
......@@ -1199,46 +1199,71 @@ static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
}
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size) {
// TODO: need to refactor this function
#define TSDB_META_FILE_INDEX 10000000
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0;
char fname[256] = "\0";
uint32_t magic = 0;
char fname[256] = "\0";
struct stat fState;
char *spath = strdup(pRepo->rootDir);
char *prefixDir = dirname(spath);
if (name[0] == 0) {
// Map index to the file name
tsdbTrace("vgId:%d name:%s index:%d eindex:%d", pRepo->config.tsdbId, name, *index, eindex);
ASSERT(*index <= eindex);
char *sdup = strdup(pRepo->rootDir);
char *prefix = dirname(sdup);
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
int fid = (*index) / 3;
if (fid >= pFileH->numOfFGroups) {
// return meta data file
if ((*index) % 3 > 0) { // it is finished
tfree(spath);
if (pFileH->numOfFGroups == 0 || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) {
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
tsdbGetMetaFileName(pRepo->rootDir, fname);
*index = TSDB_META_FILE_INDEX;
} else {
tfree(sdup);
return 0;
}
} else {
SFileGroup *pFGroup =
taosbsearch(&fid, pFileH->fGroup, pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey, TD_GE);
if (pFGroup->fileId == fid) {
strcpy(fname, pFGroup->files[(*index) % 3].fname);
} else {
tsdbGetMetaFileName(pRepo->rootDir, fname);
if (pFGroup->fileId * 3 + 2 < eindex) {
strcpy(fname, pFGroup->files[0].fname);
*index = pFGroup->fileId * 3;
} else {
tfree(sdup);
return 0;
}
}
}
strcpy(name, fname + strlen(prefix));
} else { // get the named file at the specified index. If not there, return 0
if (*index == TSDB_META_FILE_INDEX) { // get meta file
tsdbGetMetaFileName(pRepo->rootDir, fname);
} else {
// return data file name
strcpy(fname, pFileH->fGroup[fid].files[(*index) % 3].fname);
int fid = (*index) / 3;
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid);
if (pFGroup == NULL) { // not found
tfree(sdup);
return 0;
}
SFile *pFile = &pFGroup->files[(*index) % 3];
strcpy(fname, pFile->fname);
}
strcpy(name, fname + strlen(spath));
} else {
// Name is provided, need to get the file info
sprintf(fname, "%s/%s", prefixDir, name);
}
if (stat(fname, &fState) < 0) {
tfree(spath);
tfree(sdup);
return 0;
}
tfree(sdup);
*size = fState.st_size;
magic = *size;
......
......@@ -39,7 +39,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
......@@ -433,10 +433,10 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
return 0;
}
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) {
SVnodeObj *pVnode = ahandle;
*fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, size);
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
}
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
......
// TAOS standard API example. The same syntax as MySQL, but only a subet
// to compile: gcc -o prepare prepare.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
void taosMsleep(int mseconds);
int main(int argc, char *argv[])
{
TAOS *taos;
TAOS_RES *result;
TAOS_STMT *stmt;
// connect to server
if (argc < 2) {
printf("please input server ip \n");
return 0;
}
// init TAOS
taos_init();
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1);
}
taos_query(taos, "drop database demo");
if (taos_query(taos, "create database demo") != 0) {
printf("failed to create database, reason:%s\n", taos_errstr(taos));
exit(1);
}
taos_query(taos, "use demo");
// create table
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))";
if (taos_query(taos, sql) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(taos));
exit(1);
}
// sleep for one second to make sure table is created on data node
// taosMsleep(1000);
// insert 10 records
struct {
int64_t ts;
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
char bin[40];
char blob[80];
} v = {0};
stmt = taos_stmt_init(taos);
TAOS_BIND params[10];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = &params[0].buffer_length;
params[0].is_null = NULL;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b);
params[1].buffer = &v.b;
params[1].length = &params[1].buffer_length;
params[1].is_null = NULL;
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[2].buffer_length = sizeof(v.v1);
params[2].buffer = &v.v1;
params[2].length = &params[2].buffer_length;
params[2].is_null = NULL;
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[3].buffer_length = sizeof(v.v2);
params[3].buffer = &v.v2;
params[3].length = &params[3].buffer_length;
params[3].is_null = NULL;
params[4].buffer_type = TSDB_DATA_TYPE_INT;
params[4].buffer_length = sizeof(v.v4);
params[4].buffer = &v.v4;
params[4].length = &params[4].buffer_length;
params[4].is_null = NULL;
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[5].buffer_length = sizeof(v.v8);
params[5].buffer = &v.v8;
params[5].length = &params[5].buffer_length;
params[5].is_null = NULL;
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[6].buffer_length = sizeof(v.f4);
params[6].buffer = &v.f4;
params[6].length = &params[6].buffer_length;
params[6].is_null = NULL;
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[7].buffer_length = sizeof(v.f8);
params[7].buffer = &v.f8;
params[7].length = &params[7].buffer_length;
params[7].is_null = NULL;
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
params[8].buffer_length = sizeof(v.bin);
params[8].buffer = v.bin;
params[8].length = &params[8].buffer_length;
params[8].is_null = NULL;
strcpy(v.blob, "一二三四五六七八九十");
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer_length = strlen(v.blob);
params[9].buffer = v.blob;
params[9].length = &params[9].buffer_length;
params[9].is_null = NULL;
int is_null = 1;
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
}
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 10; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
v.v1 = (int8_t)i;
v.v2 = (int16_t)(i * 2);
v.v4 = (int32_t)(i * 4);
v.v8 = (int64_t)(i * 8);
v.f4 = (float)(i * 40);
v.f8 = (double)(i * 80);
for (int j = 0; j < sizeof(v.bin) - 1; ++j) {
v.bin[j] = (char)(i + '0');
}
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute insert statement.\n");
exit(1);
}
taos_stmt_close(stmt);
printf("==== success inset data ====.\n");
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
v.v1 = 5;
v.v2 = 15;
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute select statement.\n");
exit(1);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
taos_stmt_close(stmt);
return getchar();
}
......@@ -20,32 +20,30 @@ from util.cases import *
from util.sql import *
from util.dnodes import *
current_tb = ""
last_tb = ""
last_stb = ""
written = 0
class Test (threading.Thread):
def __init__(self, threadId, name, sleepTime):
def __init__(self, threadId, name):
threading.Thread.__init__(self)
self.threadId = threadId
self.name = name
self.sleepTime = sleepTime
self.threadLock = threading.Lock()
def create_table(self):
global current_tb
tdLog.info("create_table")
global last_tb
global written
tdLog.info("create a table")
current_tb = "tb%d" % int(round(time.time() * 1000))
tdLog.info("current table %s" % current_tb)
if (current_tb == last_tb):
return
else:
tdLog.info("will create table %s" % current_tb)
tdSql.execute(
'create table %s (ts timestamp, speed int)' %
current_tb)
......@@ -53,30 +51,27 @@ class Test (threading.Thread):
written = 0
def insert_data(self):
global current_tb
tdLog.info("insert_data")
global last_tb
global written
tdLog.info("will insert data to table")
if (current_tb == ""):
if (last_tb == ""):
tdLog.info("no table, create first")
self.create_table()
tdLog.info("insert data to table")
tdLog.info("will insert data to table")
for i in range(0, 10):
self.threadLock.acquire()
insertRows = 1000
tdLog.info("insert %d rows to %s" % (insertRows, current_tb))
tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
for j in range(0, insertRows):
ret = tdSql.execute(
'insert into %s values (now + %dm, %d)' %
(current_tb, j, j))
(last_tb, j, j))
written = written + 1
self.threadLock.release()
def query_data(self):
global current_tb
tdLog.info("query_data")
global last_tb
global written
......@@ -86,53 +81,90 @@ class Test (threading.Thread):
tdSql.checkRows(written)
def create_stable(self):
global current_tb
tdLog.info("create_stable")
global last_tb
global last_stb
global written
current_stb = "stb%d" % int(round(time.time() * 1000))
tdLog.info("create a super table")
if (current_stb == last_stb):
return
else:
tdLog.info("will create stable %s" % current_stb)
tdSql.execute(
'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
current_stb)
last_stb = current_stb
current_tb = "tb%d" % int(round(time.time() * 1000))
tdSql.execute(
"create table %s using %s tags (1, '表1')" %
(current_tb, last_stb))
last_tb = current_tb
tdSql.execute(
"insert into %s values (now, 27, '我是nchar字符串')" %
last_tb)
self.written = self.written + 1
def drop_stable(self):
tdLog.info("drop_stable")
global last_stb
if (last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will drop last super table")
tdSql.execute('drop table %s' % last_stb)
last_stb = ""
def restart_database(self):
global current_tb
tdLog.info("restart_database")
global last_tb
global written
tdLog.info("restart databae")
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.sleep(5)
def force_restart(self):
global current_tb
def force_restart_database(self):
tdLog.info("force_restart_database")
global last_tb
global written
tdLog.info("force restart database")
tdDnodes.forcestop(1)
tdDnodes.start(1)
tdLog.sleep(5)
def drop_table(self):
global current_tb
tdLog.info("drop_table")
global last_tb
global written
for i in range(0, 10):
self.threadLock.acquire()
tdLog.info("current_tb %s" % current_tb)
if (current_tb != ""):
tdLog.info("drop current tb %s" % current_tb)
tdSql.execute("drop table %s" % current_tb)
current_tb = ""
if (last_tb != ""):
tdLog.info("drop last_tb %s" % last_tb)
tdSql.execute("drop table %s" % last_tb)
last_tb = ""
written = 0
tdLog.sleep(self.sleepTime)
self.threadLock.release()
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
global last_stb
if (last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % last_stb)
def reset_query_cache(self):
global current_tb
tdLog.info("reset_query_cache")
global last_tb
global written
......@@ -141,51 +173,66 @@ class Test (threading.Thread):
tdLog.sleep(1)
def reset_database(self):
global current_tb
tdLog.info("reset_database")
global last_tb
global written
tdLog.info("reset database")
tdDnodes.forcestop(1)
tdDnodes.deploy(1)
current_tb = ""
last_tb = ""
written = 0
tdDnodes.start(1)
tdSql.prepare()
def delete_datafiles(self):
global current_tb
tdLog.info("delete_data_files")
global last_tb
global written
tdLog.info("delete data files")
dnodesDir = tdDnodes.getDnodesRootDir()
dataDir = dnodesDir + '/dnode1/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
current_tb = ""
last_tb = ""
written = 0
tdDnodes.start(1)
tdSql.prepare()
def run(self):
switch = {
dataOp = {
1: self.insert_data,
2: self.query_data,
3: self.query_data_from_stable,
}
dbOp = {
1: self.create_table,
2: self.insert_data,
3: self.query_data,
4: self.create_stable,
5: self.restart_database,
6: self.force_restart,
7: self.drop_table,
8: self.reset_query_cache,
9: self.reset_database,
10: self.delete_datafiles,
2: self.create_stable,
3: self.restart_database,
4: self.force_restart_database,
5: self.drop_table,
6: self.reset_query_cache,
7: self.reset_database,
8: self.delete_datafiles,
9: self.drop_stable,
}
switch.get(self.threadId, lambda: "ERROR")()
if (self.threadId == 1):
while True:
self.threadLock.acquire()
tdLog.notice("first thread")
randDataOp = random.randint(1, 3)
dataOp.get(randDataOp , lambda: "ERROR")()
self.threadLock.release()
elif (self.threadId == 2):
while True:
tdLog.notice("second thread")
self.threadLock.acquire()
randDbOp = random.randint(1, 9)
dbOp.get(randDbOp, lambda: "ERROR")()
self.threadLock.release()
class TDTestCase:
......@@ -196,8 +243,8 @@ class TDTestCase:
def run(self):
tdSql.prepare()
test1 = Test(2, "insert_data", 1)
test2 = Test(7, "drop_table", 2)
test1 = Test(1, "data operation")
test2 = Test(2, "db operation")
test1.start()
test2.start()
......
......@@ -21,101 +21,152 @@ from util.dnodes import *
class Test:
def __init__(self):
self.current_tb = ""
self.last_tb = ""
self.last_stb = ""
self.written = 0
def create_table(self):
tdLog.info("create a table")
self.current_tb = "tb%d" % int(round(time.time() * 1000))
tdLog.info("current table %s" % self.current_tb)
tdLog.info("create_table")
current_tb = "tb%d" % int(round(time.time() * 1000))
if (self.current_tb == self.last_tb):
if (current_tb == self.last_tb):
return
else:
tdLog.info("will create table %s" % current_tb)
tdSql.execute(
'create table %s (ts timestamp, speed int)' %
self.current_tb)
self.last_tb = self.current_tb
'create table %s (ts timestamp, c1 int, c2 nchar(10))' %
current_tb)
self.last_tb = current_tb
self.written = 0
def insert_data(self):
tdLog.info("will insert data to table")
if (self.current_tb == ""):
tdLog.info("insert_data")
if (self.last_tb == ""):
tdLog.info("no table, create first")
self.create_table()
tdLog.info("insert data to table")
tdLog.info("will insert data to table")
insertRows = 10
tdLog.info("insert %d rows to %s" % (insertRows, self.last_tb))
for i in range(0, insertRows):
ret = tdSql.execute(
'insert into %s values (now + %dm, %d)' %
(self.last_tb, i, i))
'insert into %s values (now + %dm, %d, "%s")' %
(self.last_tb, i, i, "->" + str(i)))
self.written = self.written + 1
tdLog.info("insert earlier data")
tdSql.execute('insert into %s values (now - 5m , 10)' % self.last_tb)
tdSql.execute(
'insert into %s values (now - 5m , 10, " - 5m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute('insert into %s values (now - 6m , 10)' % self.last_tb)
tdSql.execute(
'insert into %s values (now - 6m , 10, " - 6m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute('insert into %s values (now - 7m , 10)' % self.last_tb)
tdSql.execute(
'insert into %s values (now - 7m , 10, " - 7m")' %
self.last_tb)
self.written = self.written + 1
tdSql.execute('insert into %s values (now - 8m , 10)' % self.last_tb)
tdSql.execute(
'insert into %s values (now - 8m , 10, " - 8m")' %
self.last_tb)
self.written = self.written + 1
def query_data(self):
tdLog.info("query_data")
if (self.written > 0):
tdLog.info("query data from table")
tdSql.query("select * from %s" % self.last_tb)
tdSql.checkRows(self.written)
def create_stable(self):
tdLog.info("create a super table")
tdLog.info("create_stable")
current_stb = "stb%d" % int(round(time.time() * 1000))
if (current_stb == self.last_stb):
return
else:
tdLog.info("will create stable %s" % current_stb)
tdSql.execute(
'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' %
current_stb)
self.last_stb = current_stb
current_tb = "tb%d" % int(round(time.time() * 1000))
tdSql.execute(
"create table %s using %s tags (1, '表1')" %
(current_tb, self.last_stb))
self.last_tb = current_tb
tdSql.execute(
"insert into %s values (now, 27, '我是nchar字符串')" %
self.last_tb)
self.written = self.written + 1
def drop_stable(self):
tdLog.info("drop_stable")
if (self.last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will drop last super table")
tdSql.execute('drop table %s' % self.last_stb)
self.last_stb = ""
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
if (self.last_stb == ""):
tdLog.info("no super table")
return
else:
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % self.last_stb)
def restart_database(self):
tdLog.info("restart databae")
tdLog.info("restart_databae")
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.sleep(5)
def force_restart(self):
tdLog.info("force restart database")
def force_restart_database(self):
tdLog.info("force_restart_database")
tdDnodes.forcestop(1)
tdDnodes.start(1)
tdLog.sleep(5)
tdSql.prepare()
def drop_table(self):
if (self.current_tb != ""):
tdLog.info("drop current tb %s" % self.current_tb)
tdSql.execute("drop table %s" % self.current_tb)
self.current_tb = ""
tdLog.info("drop_table")
if (self.last_tb != ""):
tdLog.info("drop last tb %s" % self.last_tb)
tdSql.execute("drop table %s" % self.last_tb)
self.last_tb = ""
self.written = 0
def reset_query_cache(self):
tdLog.info("reset query cache")
tdLog.info("reset_query_cache")
tdSql.execute("reset query cache")
tdLog.sleep(1)
def reset_database(self):
tdLog.info("reset database")
tdLog.info("reset_database")
tdDnodes.forcestop(1)
tdDnodes.deploy(1)
self.current_tb = ""
self.last_tb = ""
self.written = 0
tdDnodes.start(1)
tdLog.sleep(5)
tdSql.prepare()
def delete_datafiles(self):
tdLog.info("delete data files")
tdLog.info("delete_datafiles")
dnodesDir = tdDnodes.getDnodesRootDir()
dataDir = dnodesDir + '/dnode1/*'
deleteCmd = 'rm -rf %s' % dataDir
os.system(deleteCmd)
self.current_tb = ""
self.last_tb = ""
self.written = 0
tdDnodes.start(1)
......@@ -138,15 +189,17 @@ class TDTestCase:
3: test.query_data,
4: test.create_stable,
5: test.restart_database,
6: test.force_restart,
6: test.force_restart_database,
7: test.drop_table,
8: test.reset_query_cache,
9: test.reset_database,
10: test.delete_datafiles,
11: test.query_data_from_stable,
12: test.drop_stable,
}
for x in range(1, 100):
r = random.randint(1, 10)
for x in range(1, 1000):
r = random.randint(1, 12)
tdLog.notice("iteration %d run func %d" % (x, r))
switch.get(r, lambda: "ERROR")()
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMPeers -v 1
system sh/cfg.sh -n dnode2 -c numOfMPeers -v 1
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
system sh/cfg.sh -n dnode3 -c walLevel -v 2
system sh/cfg.sh -n dnode4 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c alternativeRole -v 1
system sh/cfg.sh -n dnode2 -c alternativeRole -v 2
system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1, only deploy mnode
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
print ============== step2: start dnode2 and add into cluster , then create database with replica 1, and create table, insert data
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname2
sleep 3000
$totalTableNum = 10000
$sleepTimer = 10000
$db = db
sql create database $db replica 1 maxTables $totalTableNum
sql use $db
# create table , insert data
$stb = stb
sql create table $stb (ts timestamp, c1 int) tags(t1 int)
$rowNum = 100
$tblNum = $totalTableNum
$totalRows = 0
$tsStart = 1420041600000
$i = 0
while $i < $tblNum
$tb = tb . $i
sql create table $tb using $stb tags( $i )
$x = 0
while $x < $rowNum
$ts = $tsStart + $x
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
$x = $x + 60
endw
$totalRows = $totalRows + $x
print info: inserted $x rows into $tb and totalRows: $totalRows
$i = $i + 1
endw
sql select count(*) from $stb
sleep 1000
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step3: start dnode3 and add into cluster , then alter replica from 1 to 2, and waiting sync
system sh/exec.sh -n dnode3 -s start
sql create dnode $hostname3
sleep 3000
sql alter database $db replica 2
sleep $sleepTimer
wait_dnode3_ready:
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode3_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
#$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
#$dnode5Status = $data4_5
if $dnode2Status != ready then
sleep 2000
goto wait_dnode3_ready
endi
if $dnode3Status != ready then
sleep 2000
goto wait_dnode3_ready
endi
sleep $sleepTimer
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step4: stop dnode2 for checking if sync success
system sh/exec.sh -n dnode2 -s stop
sleep $sleepTimer
wait_dnode2_offline:
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode2_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
#$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
#$dnode5Status = $data4_5
if $dnode2Status != offline then
sleep 2000
goto wait_dnode2_offline
endi
if $dnode3Status != ready then
sleep 2000
goto wait_dnode2_offline
endi
sleep $sleepTimer # waitting for move master vnode of dnode2 to dnode3
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step5: restart dnode2
system sh/exec.sh -n dnode2 -s start
sleep 3000
wait_dnode2_ready:
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode2_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
#$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
#$dnode5Status = $data4_5
if $dnode2Status != ready then
sleep 2000
goto wait_dnode2_ready
endi
sleep $sleepTimer
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step6: start dnode4 and add into cluster , then alter replica from 2 to 3, and waiting sync
system sh/exec.sh -n dnode4 -s start
sql create dnode $hostname4
sleep 3000
sql alter database $db replica 3
sleep $sleepTimer
wait_dnode4_ready:
sql show dnodes
if $rows != 4 then
sleep 2000
goto wait_dnode4_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
#$dnode1Status = $data4_1
#$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
#$dnode5Status = $data4_5
if $dnode4Status != ready then
sleep 2000
goto wait_dnode4_ready
endi
sleep $sleepTimer
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step7: alter replica from 3 to 2, and waiting sync
sql alter database $db replica 2
sleep $sleepTimer
wait_vgroups_replic_to_2:
sql show vgroups
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
$thirdDnode_2 = $data8_1
$thirdDnode_3 = $data8_2
$thirdDnode_4 = $data8_3
$thirdDnode_5 = $data8_4
if $thirdDnode_2 != null then
sleep 2000
goto wait_vgroups_replic_to_2
endi
if $thirdDnode_3 != null then
sleep 2000
goto wait_vgroups_replic_to_2
endi
if $thirdDnode_4 != null then
sleep 2000
goto wait_vgroups_replic_to_2
endi
if $thirdDnode_5 != null then
sleep 2000
goto wait_vgroups_replic_to_2
endi
sleep $sleepTimer #waiting del one replica data
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step8: alter replica from 2 to 1, and waiting sync
sql alter database $db replica 1
sleep $sleepTimer
wait_vgroups_replic_to_1:
sql show vgroups
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
$sencodDnode_2 = $data5_1
$sencodDnode_3 = $data5_2
$sencodDnode_4 = $data5_3
$sencodDnode_5 = $data5_4
if $sencodDnode_2 != null then
sleep 2000
goto wait_vgroups_replic_to_1
endi
if $sencodDnode_3 != null then
sleep 2000
goto wait_vgroups_replic_to_1
endi
if $sencodDnode_4 != null then
sleep 2000
goto wait_vgroups_replic_to_1
endi
if $sencodDnode_5 != null then
sleep 2000
goto wait_vgroups_replic_to_1
endi
all_dnodes_ready:
sql show dnodes
if $rows != 4 then
sleep 2000
goto all_dnodes_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
#$dnode5Status = $data4_5
if $dnode1Status != ready then
sleep 2000
goto all_dnodes_ready
endi
if $dnode2Status != ready then
sleep 2000
goto all_dnodes_ready
endi
if $dnode3Status != ready then
sleep 2000
goto all_dnodes_ready
endi
if $dnode4Status != ready then
sleep 2000
goto all_dnodes_ready
endi
sleep $sleepTimer #waiting del one replica data
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
print ============== step9: drop dnode2/dnode3
sql drop dnode $hostname2
sql drop dnode $hostname3
sleep $sleepTimer
wait_dnode23_dropped:
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode23_dropped
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode2Status != null then
sleep 2000
goto wait_dnode23_dropped
endi
if $dnode3Status != null then
sleep 2000
goto wait_dnode23_dropped
endi
if $dnode4Status != ready then
return -1
endi
sleep $sleepTimer #waiting move vnode from dnode3/dnode3 to dnode4
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册