未验证 提交 36547256 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7667 from taosdata/feature/szhou/sml-master

schemaless master delivery
......@@ -486,6 +486,7 @@ bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, const char* sql);
int32_t tscInvalidOperationMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
......
......@@ -361,15 +361,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
tscImportDataFromFile(pSql);
......
......@@ -1566,6 +1566,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes->qId = 0;
pRes->numOfRows = 1;
registerSqlObj(pSql);
strtolower(pSql->sqlstr, sql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
......@@ -1575,8 +1577,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.insertParam.numOfParams = 0;
pSql->cmd.batchSize = 0;
registerSqlObj(pSql);
int32_t ret = stmtParseInsertTbTags(pSql, pStmt);
if (ret != TSDB_CODE_SUCCESS) {
STMT_RET(ret);
......
......@@ -116,7 +116,7 @@ static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static int32_t validateOneTag(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
static bool hasNormalColumnFilter(SQueryInfo* pQueryInfo);
......@@ -1538,9 +1538,7 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
/*
* tags name /column name is truncated in sql.y
*/
bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
//const char* msg1 = "timestamp not allowed in tags";
const char* msg2 = "duplicated column names";
int32_t validateOneTag(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
const char* msg3 = "tag length too long";
const char* msg4 = "invalid tag name";
const char* msg5 = "invalid binary/nchar tag length";
......@@ -1555,8 +1553,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
// no more max columns
if (numOfTags + numOfCols >= TSDB_MAX_COLUMNS) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
// no more than 6 tags
......@@ -1564,8 +1561,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
char msg[128] = {0};
sprintf(msg, "tags no more than %d", TSDB_MAX_TAGS);
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
// no timestamp allowable
......@@ -1575,8 +1571,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
//}
if ((pTagField->type < TSDB_DATA_TYPE_BOOL) || (pTagField->type > TSDB_DATA_TYPE_UBIGINT)) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
......@@ -1588,20 +1583,17 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
// length less than TSDB_MAX_TASG_LEN
if (nLen + pTagField->bytes > TSDB_MAX_TAGS_LEN) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
// tags name can not be a keyword
if (validateColumnName(pTagField->name) != TSDB_CODE_SUCCESS) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
// binary(val), val can not be equalled to or less than 0
if ((pTagField->type == TSDB_DATA_TYPE_BINARY || pTagField->type == TSDB_DATA_TYPE_NCHAR) && pTagField->bytes <= 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
// field name must be unique
......@@ -1609,17 +1601,15 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
for (int32_t i = 0; i < numOfTags + numOfCols; ++i) {
if (strncasecmp(pTagField->name, pSchema[i].name, sizeof(pTagField->name) - 1) == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "duplicated column names");
}
}
return true;
return TSDB_CODE_SUCCESS;
}
bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
int32_t validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg1 = "too many columns";
const char* msg2 = "duplicated column names";
const char* msg3 = "column length too long";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid column name";
......@@ -1634,18 +1624,15 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
// no more max columns
if (numOfCols >= TSDB_MAX_COLUMNS || numOfTags + numOfCols >= TSDB_MAX_COLUMNS) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pColField->type < TSDB_DATA_TYPE_BOOL || pColField->type > TSDB_DATA_TYPE_UBIGINT) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
if (validateColumnName(pColField->name) != TSDB_CODE_SUCCESS) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
SSchema* pSchema = tscGetTableSchema(pTableMeta);
......@@ -1656,25 +1643,23 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
}
if (pColField->bytes <= 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
// length less than TSDB_MAX_BYTES_PER_ROW
if (nLen + pColField->bytes > TSDB_MAX_BYTES_PER_ROW) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return false;
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
// field name must be unique
for (int32_t i = 0; i < numOfTags + numOfCols; ++i) {
if (strncasecmp(pColField->name, pSchema[i].name, sizeof(pColField->name) - 1) == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return false;
//return tscErrorMsgWithCode(TSDB_CODE_TSC_DUP_COL_NAMES, tscGetErrorMsgPayload(pCmd), pColField->name, NULL);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "duplicated column names");
}
}
return true;
return TSDB_CODE_SUCCESS;
}
/* is contained in pFieldList or not */
......@@ -5792,7 +5777,6 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg19 = "invalid new tag name";
const char* msg20 = "table is not super table";
const char* msg21 = "only binary/nchar column length could be modified";
const char* msg22 = "new column length should be bigger than old one";
const char* msg23 = "only column length coulbe be modified";
const char* msg24 = "invalid binary/nchar column length";
......@@ -5844,8 +5828,9 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
TAOS_FIELD* p = taosArrayGet(pFieldList, 0);
if (!validateOneTags(pCmd, p)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
int32_t ret = validateOneTag(pCmd, p);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, p);
......@@ -6021,8 +6006,9 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
TAOS_FIELD* p = taosArrayGet(pFieldList, 0);
if (!validateOneColumn(pCmd, p)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
int32_t ret = validateOneColumn(pCmd, p);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, p);
......@@ -6085,7 +6071,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
if (pItem->bytes <= pColSchema->bytes) {
return invalidOperationMsg(pMsg, msg22);
return tscErrorMsgWithCode(TSDB_CODE_TSC_INVALID_COLUMN_LENGTH, pMsg, pItem->name, NULL);
}
SSchema* pSchema = (SSchema*) pTableMetaInfo->pTableMeta->schema;
......@@ -6136,7 +6122,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
if (pItem->bytes <= pColSchema->bytes) {
return invalidOperationMsg(pMsg, msg22);
return tscErrorMsgWithCode(TSDB_CODE_TSC_INVALID_TAG_LENGTH, pMsg, pItem->name, NULL);
}
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
......
......@@ -4073,6 +4073,31 @@ int32_t tscInvalidOperationMsg(char* msg, const char* additionalInfo, const char
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, const char* sql) {
const char* msgFormat1 = "%s:%s";
const char* msgFormat2 = "%s:\'%s\' (%s)";
const char* msgFormat3 = "%s:\'%s\'";
const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) {
assert(errMsg != NULL);
sprintf(dstBuffer, msgFormat1, tstrerror(code), errMsg);
return code;
}
char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (errMsg != NULL) {
sprintf(dstBuffer, msgFormat2, tstrerror(code), buf, errMsg);
} else {
sprintf(dstBuffer, msgFormat3, tstrerror(code), buf); // no additional information for invalid sql error
}
return code;
}
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
......
......@@ -103,6 +103,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty")
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) //"Syntax error in Line")
#define TSDB_CODE_TSC_NO_META_CACHED TAOS_DEF_ERROR_CODE(0, 0x021C) //"No table meta cached")
#define TSDB_CODE_TSC_DUP_COL_NAMES TAOS_DEF_ERROR_CODE(0, 0x021D) //"duplicated column names")
#define TSDB_CODE_TSC_INVALID_TAG_LENGTH TAOS_DEF_ERROR_CODE(0, 0x021E) //"Invalid tag length")
#define TSDB_CODE_TSC_INVALID_COLUMN_LENGTH TAOS_DEF_ERROR_CODE(0, 0x021F) //"Invalid column length")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
......@@ -185,6 +188,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0374) //"Invalid func")
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x0375) //"Invalid func bufSize")
#define TSDB_CODE_MND_INVALID_TAG_LENGTH TAOS_DEF_ERROR_CODE(0, 0x0376) //"invalid tag length")
#define TSDB_CODE_MND_INVALID_COLUMN_LENGTH TAOS_DEF_ERROR_CODE(0, 0x0377) //"invalid column length")
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available")
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists")
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382) //"Invalid database options")
......
......@@ -1518,6 +1518,13 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
// update
SSchema *schema = (SSchema *) (pStable->schema + col);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
if (pAlter->schema[0].bytes <= schema->bytes) {
mError("msg:%p, app:%p stable:%s, modify column len. column:%s, len from %d to %d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name, schema->bytes, pAlter->schema[0].bytes);
return TSDB_CODE_MND_INVALID_COLUMN_LENGTH;
}
schema->bytes = pAlter->schema[0].bytes;
pStable->sversion++;
mInfo("msg:%p, app:%p stable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
......@@ -1548,6 +1555,12 @@ static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) {
// update
SSchema *schema = (SSchema *) (pStable->schema + col + pStable->numOfColumns);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
if (pAlter->schema[0].bytes <= schema->bytes) {
mError("msg:%p, app:%p stable:%s, modify tag len. tag:%s, len from %d to %d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name, schema->bytes, pAlter->schema[0].bytes);
return TSDB_CODE_MND_INVALID_TAG_LENGTH;
}
schema->bytes = pAlter->schema[0].bytes;
pStable->tversion++;
mInfo("msg:%p, app:%p stable %s, start to modify tag len %s to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
......
......@@ -112,6 +112,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too lon
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_META_CACHED, "No table meta cached")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_COL_NAMES, "duplicated column names")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TAG_LENGTH, "Invalid tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_COLUMN_LENGTH, "Invalid column length")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
......@@ -194,6 +197,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC, "Invalid func")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TAG_LENGTH, "invalid tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COLUMN_LENGTH, "invalid column length")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
......
......@@ -58,11 +58,7 @@ int main(int argc, char* argv[]) {
time_t ct = time(0);
int64_t ts = ct * 1000;
char* lineFormat =
"sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11="
"\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0;
......@@ -76,7 +72,7 @@ int main(int argc, char* argv[]) {
}
}
}
shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
//shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines");
int64_t begin = getTimeInUs();
......@@ -84,140 +80,5 @@ int main(int argc, char* argv[]) {
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %" PRId64 "\n", code, tstrerror(code), end - begin);
char* lines_000_0[] = {
"sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7="
"2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12="
"L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" "
"1626006833639000us"};
code = taos_insert_lines(taos, lines_000_0, sizeof(lines_000_0) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_0 should return error\n");
return -1;
}
char* lines_000_1[] = {
"sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,"
"t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12="
"L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" "
"1626006833639001"};
code = taos_insert_lines(taos, lines_000_1, sizeof(lines_000_1) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_1 should return error\n");
return -1;
}
char* lines_000_2[] = {
"sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
"22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"};
code = taos_insert_lines(taos, lines_000_2, sizeof(lines_000_2) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_0[] = {
"sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us",
};
code = taos_insert_lines(taos, lines_001_0, sizeof(lines_001_0) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_1[] = {
"sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
"22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"};
code = taos_insert_lines(taos, lines_001_1, sizeof(lines_001_1) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_2[] = {
"sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
"22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 0"};
code = taos_insert_lines(taos, lines_001_2, sizeof(lines_001_2) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_002[] = {
"stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006834s"};
code = taos_insert_lines(taos, lines_002, sizeof(lines_002) / sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
// Duplicate key check;
char* lines_003_1[] = {"std,id=\"std_3_1\",t1=4i64,Id=\"std\",t2=true c1=true 1626006834s"};
code = taos_insert_lines(taos, lines_003_1, sizeof(lines_003_1) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_003_2[] = {"std,id=\"std_3_2\",tag1=4i64,Tag2=true,tAg3=2,TaG2=\"dup!\" c1=true 1626006834s"};
code = taos_insert_lines(taos, lines_003_2, sizeof(lines_003_2) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_003_3[] = {"std,id=\"std_3_3\",tag1=4i64 field1=true,Field2=2,FIElD1=\"dup!\",fIeLd4=true 1626006834s"};
code = taos_insert_lines(taos, lines_003_3, sizeof(lines_003_3) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_3 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_003_4[] = {
"std,id=\"std_3_4\",tag1=4i64,dupkey=4i16,tag2=T field1=true,dUpkEy=1e3f32,field2=\"1234\" 1626006834s"};
code = taos_insert_lines(taos, lines_003_4, sizeof(lines_003_4) / sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_003_4 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
return 0;
}
......@@ -1053,7 +1053,7 @@ class TDTestCase:
s_stb_d_tb_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[5]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_a_col_m_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(6)
tdSql.checkRows(3)
def sStbDtbDdataAtMcInsertMultiThreadCheckCase(self):
"""
......@@ -1123,7 +1123,7 @@ class TDTestCase:
tdSql.checkRows(5)
for t in ["t10", "t11"]:
tdSql.query(f"select * from {stb_name} where {t} is not NULL;")
tdSql.checkRows(6)
tdSql.checkRows(0)
def sStbDtbDdataDtsInsertMultiThreadCheckCase(self):
"""
......@@ -1150,7 +1150,7 @@ class TDTestCase:
s_stb_d_tb_d_ts_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[11]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_d_ts_a_col_m_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(6)
tdSql.checkRows(3)
def test(self):
input_sql1 = "rfasta,id=\"rfasta_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"ddzhiksj\",t8=L\"ncharTagValue\" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"bnhwlgvj\",c8=L\"ncharTagValue\",c9=7u64 1626006933640000000ns"
......@@ -1203,19 +1203,19 @@ class TDTestCase:
self.sStbDtbDdataInsertMultiThreadCheckCase()
# # ! concurrency conflict
# self.sStbDtbDdataAcMtInsertMultiThreadCheckCase()
# self.sStbDtbDdataAtMcInsertMultiThreadCheckCase()
self.sStbDtbDdataAcMtInsertMultiThreadCheckCase()
self.sStbDtbDdataAtMcInsertMultiThreadCheckCase()
self.sStbStbDdataDtsInsertMultiThreadCheckCase()
# # ! concurrency conflict
# self.sStbStbDdataDtsAcMtInsertMultiThreadCheckCase()
# self.sStbStbDdataDtsAtMcInsertMultiThreadCheckCase()
self.sStbStbDdataDtsAcMtInsertMultiThreadCheckCase()
self.sStbStbDdataDtsAtMcInsertMultiThreadCheckCase()
self.sStbDtbDdataDtsInsertMultiThreadCheckCase()
# ! concurrency conflict
# self.sStbDtbDdataDtsAcMtInsertMultiThreadCheckCase()
self.sStbDtbDdataDtsAcMtInsertMultiThreadCheckCase()
......
此差异已折叠。
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import time
from copy import deepcopy
import numpy as np
from util.log import *
from util.cases import *
from util.sql import *
from util.common import tdCom
import threading
import itertools
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
self.lock = threading.Lock()
def genMultiColStr(self, int_count=4, double_count=0, binary_count=0):
'''
related to self.getPerfSql()
:count = 4 ---> 4 int
:count = 1000 ---> 400 int 400 double 200 binary(128)
:count = 4000 ---> 1900 int 1900 double 200 binary(128)
'''
col_str = ""
if double_count == 0 and binary_count == 0:
for i in range(0, int_count):
if i < (int_count-1):
col_str += f'c{i}={random.randint(0, 255)}i32,'
else:
col_str += f'c{i}={random.randint(0, 255)}i32 '
elif double_count > 0 and binary_count == 0:
for i in range(0, int_count):
col_str += f'c{i}={random.randint(0, 255)}i32,'
for i in range(0, double_count):
if i < (double_count-1):
col_str += f'c{i+int_count}={random.randint(1, 255)}.{i}f64,'
else:
col_str += f'c{i+int_count}={random.randint(1, 255)}.{i}f64 '
elif double_count == 0 and binary_count > 0:
for i in range(0, int_count):
col_str += f'c{i}={random.randint(0, 255)}i32,'
for i in range(0, binary_count):
if i < (binary_count-1):
col_str += f'c{i+int_count}=\"{tdCom.getLongName(5, "letters")}\",'
else:
col_str += f'c{i+int_count}=\"{tdCom.getLongName(5, "letters")}\" '
elif double_count > 0 and binary_count > 0:
for i in range(0, int_count):
col_str += f'c{i}={random.randint(0, 255)}i32,'
for i in range(0, double_count):
col_str += f'c{i+int_count}={random.randint(1, 255)}.{i}f64,'
for i in range(0, binary_count):
if i < (binary_count-1):
col_str += f'c{i+int_count+double_count}=\"{tdCom.getLongName(5, "letters")}\",'
else:
col_str += f'c{i+int_count+double_count}=\"{tdCom.getLongName(5, "letters")}\" '
return col_str
def genLongSql(self, int_count=4, double_count=0, binary_count=0, init=False):
'''
:init ---> stb insert line
'''
if init:
tag_str = f'id="init",t0={random.randint(0, 65535)}i32,t1=\"{tdCom.getLongName(10, "letters")}\"'
else:
tag_str = f'id="sub_{tdCom.getLongName(5, "letters")}_{tdCom.getLongName(5, "letters")}",t0={random.randint(0, 65535)}i32,t1=\"{tdCom.getLongName(10, "letters")}\"'
col_str = self.genMultiColStr(int_count=int_count, double_count=double_count, binary_count=binary_count)
long_sql = 'stb' + ',' + tag_str + ' ' + col_str + '0'
return long_sql
def getPerfSql(self, count=4, init=False):
'''
:count = 4 ---> 4 int
:count = 1000 ---> 400 int 400 double 200 binary(128)
:count = 4000 ---> 1900 int 1900 double 200 binary(128)
'''
if count == 4:
input_sql = self.genLongSql(init=init)
elif count == 1000:
input_sql = self.genLongSql(400, 400, 200, init=init)
elif count == 4000:
input_sql = self.genLongSql(1900, 1900, 200, init=init)
return input_sql
def replaceLastStr(self, str, new):
'''
replace last element of str to new element
'''
list_ori = list(str)
list_ori[-1] = new
return ''.join(list_ori)
def createStb(self, count=4):
'''
create 1 stb
'''
input_sql = self.getPerfSql(count=count, init=True)
self._conn.insertLines([input_sql])
def batchCreateTable(self, batch_list):
'''
schemaless insert api
'''
print(threading.current_thread().name, "length=", len(batch_list))
print(threading.current_thread().name, 'firstline', batch_list[0][0:50], '...', batch_list[0][-50:-1])
print(threading.current_thread().name, 'lastline:', batch_list[-1][0:50], '...', batch_list[-1][-50:-1])
self._conn.insertLines(batch_list)
print(threading.current_thread().name, 'end')
def splitGenerator(self, table_list, sub_list_len):
'''
split a list to n piece of sub_list
[a, b, c, d] ---> [[a, b], [c, d]]
yield type ---> generator
'''
for i in range(0, len(table_list), sub_list_len):
yield table_list[i:i + sub_list_len]
def genTbListGenerator(self, table_list, sub_list_len):
'''
split table_list, after split, every sub_list len is sub_list_len
'''
table_list_generator = self.splitGenerator(table_list, sub_list_len)
return table_list_generator
def genTableList(self, count=4, table_count=10000):
'''
gen len(table_count) table_list
'''
table_list = list()
for i in range(table_count):
table_list.append(self.getPerfSql(count=count))
return table_list
def threadCreateTables(self, table_list_generator, thread_count=10):
'''
thread create tables
'''
threads = list()
for i in range(thread_count):
t = threading.Thread(target=self.batchCreateTable, args=(next(table_list_generator),))
threads.append(t)
return threads
def batchInsertRows(self, table_list, rows_count):
'''
add rows in each table ---> count=rows_count
'''
for input_sql in table_list:
ts = int(time.time())
input_sql_list = list()
for i in range(rows_count-1):
ts -= 1
elm_new = self.replaceLastStr(input_sql, str(ts)) + 's'
input_sql_list.append(elm_new)
self.batchCreateTable(input_sql_list)
def threadsInsertRows(self, rows_generator, rows_count=1000, thread_count=10):
'''
multi insert rows in each table
'''
threads = list()
for i in range(thread_count):
self.lock.acquire()
t = threading.Thread(target=self.batchInsertRows, args=(next(rows_generator), rows_count,))
threads.append(t)
self.lock.release()
return threads
def multiThreadRun(self, threads):
'''
multi run threads
'''
for t in threads:
t.start()
for t in threads:
t.join()
def createTables(self, count, table_count=10000, sub_list_len=1000, thread_count=10):
'''
create stb and tb
'''
table_list = self.genTableList(count=count, table_count=table_count)
create_tables_start_time = time.time()
self.createStb()
table_list_generator = self.genTbListGenerator(table_list, sub_list_len)
create_tables_generator, insert_rows_generator = itertools.tee(table_list_generator, 2)
self.multiThreadRun(self.threadCreateTables(table_list_generator=create_tables_generator, thread_count=thread_count))
create_tables_end_time = time.time()
create_tables_time = int(create_tables_end_time - create_tables_start_time)
return_str = f'create tables\' time of {count} columns ---> {create_tables_time}s'
return insert_rows_generator, create_tables_time, return_str
def insertRows(self, count, rows_generator, rows_count=1000, thread_count=10):
'''
insert rows
'''
insert_rows_start_time = time.time()
self.multiThreadRun(self.threadsInsertRows(rows_generator=rows_generator, rows_count=rows_count, thread_count=thread_count))
insert_rows_end_time = time.time()
insert_rows_time = int(insert_rows_end_time - insert_rows_start_time)
return_str = f'insert rows\' time of {count} columns ---> {insert_rows_time}s'
return insert_rows_time, return_str
def schemalessPerfTest(self, count, table_count=10000, sub_list_len=1000, thread_count=10):
'''
get performance
'''
insert_rows_generator = self.createTables(count=count, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
return self.insertRows(count=count, rows_generator=insert_rows_generator, rows_count=1000, thread_count=10)
def getPerfResults(self, test_times=3, table_count=10000, sub_list_len=1000, thread_count=10):
col4_time = 0
col1000_time = 0
col4000_time = 0
# for i in range(test_times):
# time_used = self.schemalessPerfTest(count=4, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
# col4_time += time_used
# col4_time /= test_times
# print(col4_time)
tdCom.cleanTb()
for i in range(test_times):
time_used = self.schemalessPerfTest(count=1000, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
col1000_time += time_used
col1000_time /= test_times
print(col1000_time)
tdCom.cleanTb()
for i in range(test_times):
time_used = self.schemalessPerfTest(count=4000, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
col4000_time += time_used
col4000_time /= test_times
print(col4000_time)
return col4_time, col1000_time, col4000_time
def run(self):
print("running {}".format(__file__))
tdSql.prepare()
result = self.getPerfResults(test_times=1, table_count=1000, sub_list_len=100, thread_count=10)
print(result)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
......@@ -50,4 +50,4 @@ class TDCom:
def close(self):
self.cursor.close()
tdCom = TDCom()
\ No newline at end of file
tdCom = TDCom()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册