提交 b6f9308c 编写于 作者: T twalthr

[FLINK-3891] [table] Add a class containing all supported Table API types

This closes #2292.
上级 2f9a28ae
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
/**
* This class enumerates all supported types of the Table API.
*/
object Types {
val STRING = BasicTypeInfo.STRING_TYPE_INFO
val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
val INT = BasicTypeInfo.INT_TYPE_INFO
val LONG = BasicTypeInfo.LONG_TYPE_INFO
val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
val DATE = SqlTimeTypeInfo.DATE
val TIME = SqlTimeTypeInfo.TIME
val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
}
......@@ -138,50 +138,5 @@ public class CastingITCase extends TableProgramsTestBase {
String expected = "1,1,1,1,2.0,2.0,true\n";
compareResultAsText(results, expected);
}
@Ignore // Date type not supported yet
@Test
public void testCastDateFromString() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple4<String, String, String, String>> input =
env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
Table table =
tableEnv.fromDataSet(input);
Table result = table
.select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
.select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
"1970-01-17 17:47:53.775\n";
compareResultAsText(results, expected);
}
@Ignore // Date type not supported yet
@Test
public void testCastDateToStringAndLong() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSource<Tuple2<String, String>> input =
env.fromElements(new Tuple2<>("2011-05-03 15:51:36.000", "1304437896000"));
Table table =
tableEnv.fromDataSet(input);
Table result = table
.select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
.select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)");
DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
compareResultAsText(results, expected);
}
}
......@@ -20,10 +20,9 @@ package org.apache.flink.api.scala.batch.table
import java.util.Date
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.{Row, TableEnvironment}
import org.apache.flink.api.table.{Row, TableEnvironment, Types}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit._
......@@ -95,25 +94,25 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
.toTable(tEnv)
.select(
// * -> String
'_1.cast(BasicTypeInfo.STRING_TYPE_INFO),
'_2.cast(BasicTypeInfo.STRING_TYPE_INFO),
'_3.cast(BasicTypeInfo.STRING_TYPE_INFO),
'_4.cast(BasicTypeInfo.STRING_TYPE_INFO),
'_1.cast(Types.STRING),
'_2.cast(Types.STRING),
'_3.cast(Types.STRING),
'_4.cast(Types.STRING),
// NUMERIC TYPE -> Boolean
'_1.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
'_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
'_3.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
'_1.cast(Types.BOOLEAN),
'_2.cast(Types.BOOLEAN),
'_3.cast(Types.BOOLEAN),
// NUMERIC TYPE -> NUMERIC TYPE
'_1.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
'_2.cast(BasicTypeInfo.INT_TYPE_INFO),
'_3.cast(BasicTypeInfo.SHORT_TYPE_INFO),
'_1.cast(Types.DOUBLE),
'_2.cast(Types.INT),
'_3.cast(Types.SHORT),
// Boolean -> NUMERIC TYPE
'_4.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
'_4.cast(Types.DOUBLE),
// identity casting
'_1.cast(BasicTypeInfo.INT_TYPE_INFO),
'_2.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
'_3.cast(BasicTypeInfo.LONG_TYPE_INFO),
'_4.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
'_1.cast(Types.INT),
'_2.cast(Types.DOUBLE),
'_3.cast(Types.LONG),
'_4.cast(Types.BOOLEAN))
val expected = "1,0.0,1,true," +
"true,false,true," +
......@@ -134,13 +133,13 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
.toTable(tEnv)
.select(
// String -> BASIC TYPE (not String, Date, Void, Character)
'_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
'_1.cast(BasicTypeInfo.SHORT_TYPE_INFO),
'_1.cast(BasicTypeInfo.INT_TYPE_INFO),
'_1.cast(BasicTypeInfo.LONG_TYPE_INFO),
'_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
'_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
'_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
'_1.cast(Types.BYTE),
'_1.cast(Types.SHORT),
'_1.cast(Types.INT),
'_1.cast(Types.LONG),
'_3.cast(Types.DOUBLE),
'_3.cast(Types.FLOAT),
'_2.cast(Types.BOOLEAN))
val expected = "1,1,1,1,2.0,2.0,true\n"
val results = t.toDataSet[Row].collect()
......@@ -157,10 +156,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val t = env.fromElements(("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
.toTable(tEnv)
.select(
'_1.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
'_2.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
'_3.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
'_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
'_1.cast(Types.DATE).cast(Types.STRING),
'_2.cast(Types.DATE).cast(Types.STRING),
'_3.cast(Types.DATE).cast(Types.STRING),
'_4.cast(Types.DATE).cast(Types.STRING))
val expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
"1970-01-17 17:47:53.775\n"
......@@ -176,12 +175,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
val t = ds.toTable(tEnv)
.select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0),
'_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1))
.select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO),
'f0.cast(BasicTypeInfo.LONG_TYPE_INFO),
'f1.cast(BasicTypeInfo.STRING_TYPE_INFO),
'f1.cast(BasicTypeInfo.LONG_TYPE_INFO))
.select('_1.cast(Types.DATE).as('f0),
'_2.cast(Types.DATE).as('f1))
.select('f0.cast(Types.STRING),
'f0.cast(Types.LONG),
'f1.cast(Types.STRING),
'f1.cast(Types.LONG))
val expected = "2011-05-03 15:51:36.000,1304437896000," +
"2011-05-03 15:51:36.000,1304437896000\n"
......
......@@ -20,14 +20,13 @@ package org.apache.flink.api.scala.batch.table
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.codegen.CodeGenException
import org.apache.flink.api.table.expressions.Null
import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
import org.apache.flink.api.table.{Row, TableEnvironment, Types, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Assert._
......@@ -108,8 +107,8 @@ class ExpressionsITCase(
.select(
'a,
'b,
Null(BasicTypeInfo.INT_TYPE_INFO),
Null(BasicTypeInfo.STRING_TYPE_INFO) === "")
Null(Types.INT),
Null(Types.STRING) === "")
try {
val ds = t.toDataSet[Row]
......
......@@ -20,11 +20,11 @@ package org.apache.flink.api.scala.expression
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.{Row, Types}
import org.junit.Test
class TimeTypesTest extends ExpressionTestBase {
......@@ -43,7 +43,7 @@ class TimeTypesTest extends ExpressionTestBase {
"2040-09-11")
testAllApis(
"1500-04-30".cast(SqlTimeTypeInfo.DATE),
"1500-04-30".cast(Types.DATE),
"'1500-04-30'.cast(DATE)",
"CAST('1500-04-30' AS DATE)",
"1500-04-30")
......@@ -60,7 +60,7 @@ class TimeTypesTest extends ExpressionTestBase {
"00:00:00")
testAllApis(
"1:30:00".cast(SqlTimeTypeInfo.TIME),
"1:30:00".cast(Types.TIME),
"'1:30:00'.cast(TIME)",
"CAST('1:30:00' AS TIME)",
"01:30:00")
......@@ -77,7 +77,7 @@ class TimeTypesTest extends ExpressionTestBase {
"2040-09-11 00:00:00.0")
testAllApis(
"1500-04-30 12:00:00".cast(SqlTimeTypeInfo.TIMESTAMP),
"1500-04-30 12:00:00".cast(Types.TIMESTAMP),
"'1500-04-30 12:00:00'.cast(TIMESTAMP)",
"CAST('1500-04-30 12:00:00' AS TIMESTAMP)",
"1500-04-30 12:00:00.0")
......@@ -107,62 +107,62 @@ class TimeTypesTest extends ExpressionTestBase {
@Test
def testTimeCasting(): Unit = {
testAllApis(
'f0.cast(SqlTimeTypeInfo.TIMESTAMP),
'f0.cast(Types.TIMESTAMP),
"f0.cast(TIMESTAMP)",
"CAST(f0 AS TIMESTAMP)",
"1990-10-14 00:00:00.0")
testAllApis(
'f1.cast(SqlTimeTypeInfo.TIMESTAMP),
'f1.cast(Types.TIMESTAMP),
"f1.cast(TIMESTAMP)",
"CAST(f1 AS TIMESTAMP)",
"1970-01-01 10:20:45.0")
testAllApis(
'f2.cast(SqlTimeTypeInfo.DATE),
'f2.cast(Types.DATE),
"f2.cast(DATE)",
"CAST(f2 AS DATE)",
"1990-10-14")
testAllApis(
'f2.cast(SqlTimeTypeInfo.TIME),
'f2.cast(Types.TIME),
"f2.cast(TIME)",
"CAST(f2 AS TIME)",
"10:20:45")
testAllApis(
'f2.cast(SqlTimeTypeInfo.TIME),
'f2.cast(Types.TIME),
"f2.cast(TIME)",
"CAST(f2 AS TIME)",
"10:20:45")
testTableApi(
'f7.cast(SqlTimeTypeInfo.DATE),
'f7.cast(Types.DATE),
"f7.cast(DATE)",
"2002-11-09")
testTableApi(
'f7.cast(SqlTimeTypeInfo.DATE).cast(BasicTypeInfo.INT_TYPE_INFO),
'f7.cast(Types.DATE).cast(Types.INT),
"f7.cast(DATE).cast(INT)",
"12000")
testTableApi(
'f7.cast(SqlTimeTypeInfo.TIME),
'f7.cast(Types.TIME),
"f7.cast(TIME)",
"00:00:12")
testTableApi(
'f7.cast(SqlTimeTypeInfo.TIME).cast(BasicTypeInfo.INT_TYPE_INFO),
'f7.cast(Types.TIME).cast(Types.INT),
"f7.cast(TIME).cast(INT)",
"12000")
testTableApi(
'f8.cast(SqlTimeTypeInfo.TIMESTAMP),
'f8.cast(Types.TIMESTAMP),
"f8.cast(TIMESTAMP)",
"2016-06-27 07:23:33.0")
testTableApi(
'f8.cast(SqlTimeTypeInfo.TIMESTAMP).cast(BasicTypeInfo.LONG_TYPE_INFO),
'f8.cast(Types.TIMESTAMP).cast(Types.LONG),
"f8.cast(TIMESTAMP).cast(LONG)",
"1467012213000")
}
......@@ -188,13 +188,13 @@ class TimeTypesTest extends ExpressionTestBase {
"false")
testAllApis(
'f0.cast(SqlTimeTypeInfo.TIMESTAMP) !== 'f2,
'f0.cast(Types.TIMESTAMP) !== 'f2,
"f0.cast(TIMESTAMP) !== f2",
"CAST(f0 AS TIMESTAMP) <> f2",
"true")
testAllApis(
'f0.cast(SqlTimeTypeInfo.TIMESTAMP) === 'f6,
'f0.cast(Types.TIMESTAMP) === 'f6,
"f0.cast(TIMESTAMP) === f6",
"CAST(f0 AS TIMESTAMP) = f6",
"true")
......@@ -218,14 +218,14 @@ class TimeTypesTest extends ExpressionTestBase {
def typeInfo = {
new RowTypeInfo(Seq(
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.TIME,
SqlTimeTypeInfo.TIMESTAMP,
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.TIME,
SqlTimeTypeInfo.TIMESTAMP,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
Types.DATE,
Types.TIME,
Types.TIMESTAMP,
Types.DATE,
Types.DATE,
Types.TIME,
Types.TIMESTAMP,
Types.INT,
Types.LONG)).asInstanceOf[TypeInformation[Any]]
}
}
......@@ -18,10 +18,9 @@
package org.apache.flink.api.table.expressions
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.{Row, Types}
import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.junit.Test
......@@ -79,37 +78,37 @@ class DecimalTypeTest extends ExpressionTestBase {
Double.MinValue.toString)
testAllApis(
Double.MinValue.cast(FLOAT_TYPE_INFO),
Double.MinValue.cast(Types.FLOAT),
s"${Double.MinValue}.cast(FLOAT)",
s"CAST(${Double.MinValue} AS FLOAT)",
Float.NegativeInfinity.toString)
testAllApis(
Byte.MinValue.cast(BYTE_TYPE_INFO),
Byte.MinValue.cast(Types.BYTE),
s"(${Byte.MinValue}).cast(BYTE)",
s"CAST(${Byte.MinValue} AS TINYINT)",
Byte.MinValue.toString)
testAllApis(
Byte.MinValue.cast(BYTE_TYPE_INFO) - 1.cast(BYTE_TYPE_INFO),
Byte.MinValue.cast(Types.BYTE) - 1.cast(Types.BYTE),
s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)",
s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)",
Byte.MaxValue.toString)
testAllApis(
Short.MinValue.cast(SHORT_TYPE_INFO),
Short.MinValue.cast(Types.SHORT),
s"(${Short.MinValue}).cast(SHORT)",
s"CAST(${Short.MinValue} AS SMALLINT)",
Short.MinValue.toString)
testAllApis(
Int.MinValue.cast(INT_TYPE_INFO) - 1,
Int.MinValue.cast(Types.INT) - 1,
s"(${Int.MinValue}).cast(INT) - 1",
s"CAST(${Int.MinValue} AS INT) - 1",
Int.MaxValue.toString)
testAllApis(
Long.MinValue.cast(LONG_TYPE_INFO),
Long.MinValue.cast(Types.LONG),
s"(${Long.MinValue}L).cast(LONG)",
s"CAST(${Long.MinValue} AS BIGINT)",
Long.MinValue.toString)
......@@ -119,51 +118,51 @@ class DecimalTypeTest extends ExpressionTestBase {
def testDecimalCasting(): Unit = {
// from String
testTableApi(
"123456789123456789123456789".cast(BIG_DEC_TYPE_INFO),
"123456789123456789123456789".cast(Types.DECIMAL),
"'123456789123456789123456789'.cast(DECIMAL)",
"123456789123456789123456789")
// from double
testAllApis(
'f3.cast(BIG_DEC_TYPE_INFO),
'f3.cast(Types.DECIMAL),
"f3.cast(DECIMAL)",
"CAST(f3 AS DECIMAL)",
"4.2")
// to double
testAllApis(
'f0.cast(DOUBLE_TYPE_INFO),
'f0.cast(Types.DOUBLE),
"f0.cast(DOUBLE)",
"CAST(f0 AS DOUBLE)",
"1.2345678912345679E8")
// to int
testAllApis(
'f4.cast(INT_TYPE_INFO),
'f4.cast(Types.INT),
"f4.cast(INT)",
"CAST(f4 AS INT)",
"123456789")
// to long
testAllApis(
'f4.cast(LONG_TYPE_INFO),
'f4.cast(Types.LONG),
"f4.cast(LONG)",
"CAST(f4 AS BIGINT)",
"123456789")
// to boolean (not SQL compliant)
testTableApi(
'f1.cast(BOOLEAN_TYPE_INFO),
'f1.cast(Types.BOOLEAN),
"f1.cast(BOOL)",
"true")
testTableApi(
'f5.cast(BOOLEAN_TYPE_INFO),
'f5.cast(Types.BOOLEAN),
"f5.cast(BOOL)",
"false")
testTableApi(
BigDecimal("123456789.123456789123456789").cast(DOUBLE_TYPE_INFO),
BigDecimal("123456789.123456789123456789").cast(Types.DOUBLE),
"(123456789.123456789123456789p).cast(DOUBLE)",
"1.2345678912345679E8")
}
......@@ -300,12 +299,12 @@ class DecimalTypeTest extends ExpressionTestBase {
def typeInfo = {
new RowTypeInfo(Seq(
BIG_DEC_TYPE_INFO,
BIG_DEC_TYPE_INFO,
INT_TYPE_INFO,
DOUBLE_TYPE_INFO,
BIG_DEC_TYPE_INFO,
BIG_DEC_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
Types.DECIMAL,
Types.DECIMAL,
Types.INT,
Types.DOUBLE,
Types.DECIMAL,
Types.DECIMAL)).asInstanceOf[TypeInformation[Any]]
}
}
......@@ -18,10 +18,9 @@
package org.apache.flink.api.table.expressions
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.{Row, Types}
import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.junit.Test
......@@ -484,21 +483,21 @@ class ScalarFunctionsTest extends ExpressionTestBase {
def typeInfo = {
new RowTypeInfo(Seq(
STRING_TYPE_INFO,
BOOLEAN_TYPE_INFO,
BYTE_TYPE_INFO,
SHORT_TYPE_INFO,
LONG_TYPE_INFO,
FLOAT_TYPE_INFO,
DOUBLE_TYPE_INFO,
INT_TYPE_INFO,
STRING_TYPE_INFO,
BYTE_TYPE_INFO,
SHORT_TYPE_INFO,
LONG_TYPE_INFO,
FLOAT_TYPE_INFO,
DOUBLE_TYPE_INFO,
INT_TYPE_INFO,
BIG_DEC_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
Types.STRING,
Types.BOOLEAN,
Types.BYTE,
Types.SHORT,
Types.LONG,
Types.FLOAT,
Types.DOUBLE,
Types.INT,
Types.STRING,
Types.BYTE,
Types.SHORT,
Types.LONG,
Types.FLOAT,
Types.DOUBLE,
Types.INT,
Types.DECIMAL)).asInstanceOf[TypeInformation[Any]]
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册