From ed69187a974cd0ecb01116ae3259c90ff3c36514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=A6=E5=A2=83=E8=BF=B7=E7=A6=BB?= Date: Wed, 22 Jan 2020 21:23:19 +0800 Subject: [PATCH] add websocket server example --- build.gradle | 4 +- src/main/resources/application.conf | 10 +++ .../cn/edu/layim/server/WebsocketServer.scala | 82 +++++++++++++++++++ .../cn/edu/layim/util/WebSocketUtil.scala | 25 +++--- 4 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 src/main/resources/application.conf create mode 100644 src/main/scala/cn/edu/layim/server/WebsocketServer.scala diff --git a/build.gradle b/build.gradle index de0011b..8eb2566 100644 --- a/build.gradle +++ b/build.gradle @@ -35,7 +35,8 @@ jar { } repositories { - maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' } +// maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' } + jcenter() mavenCentral() } @@ -88,6 +89,7 @@ dependencies { compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.23' compile group: 'com.typesafe.akka', name: 'akka-http_2.12', version: '10.1.10' compile group: 'com.typesafe.akka', name: 'akka-slf4j_2.12', version: '2.5.23' + compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.23' //test testCompile("org.springframework.boot:spring-boot-starter-test") diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..8dc2b40 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,10 @@ +akka { + actor.default-dispatcher.fork-join-executor.parallelism-max = 64 + loggers = ["akka.event.slf4j.Slf4jLogger"] + actor-system = "custom-application" + actor.debug.receive = on + loglevel = "debug" + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + http.server.websocket.periodic-keep-alive-max-idle = 1 second + http.server.websocket.periodic-keep-alive-mode = pong #客户端将不会回复此类心跳 +} \ No newline at end of file diff --git a/src/main/scala/cn/edu/layim/server/WebsocketServer.scala b/src/main/scala/cn/edu/layim/server/WebsocketServer.scala new file mode 100644 index 0000000..56e54af --- /dev/null +++ b/src/main/scala/cn/edu/layim/server/WebsocketServer.scala @@ -0,0 +1,82 @@ +package cn.edu.layim.server + + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage } +import akka.http.scaladsl.server.Directives +import akka.http.scaladsl.settings.ServerSettings +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.util.ByteString + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.io.StdIn + +/** + * akka-http websocket server + * + * @author liguobin@growingio.com + * @version 1.0,2020/1/22 + */ +object WebsocketServer extends App { + + println( + """ + | __ __ ___. _________ __ __ _________ + |/ \ / \ ____\_ |__ / _____/ ____ ____ | | __ _____/ |_ / _____/ ______________ __ ___________ + |\ \/\/ // __ \| __ \ \_____ \ / _ \_/ ___\| |/ // __ \ __\ \_____ \_/ __ \_ __ \ \/ // __ \_ __ \ + | \ /\ ___/| \_\ \/ ( <_> ) \___| <\ ___/| | / \ ___/| | \/\ /\ ___/| | \/ + | \__/\ / \___ >___ /_______ /\____/ \___ >__|_ \\___ >__| /_______ /\___ >__| \_/ \___ >__| + | \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ + |""".stripMargin) + + import Directives._ + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + val host = "localhost" + val port = 8080 + + val IMWebSocketService = { + Flow[Message].mapConcat { + case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil + case bm: BinaryMessage => + bm.dataStream.runWith(Sink.ignore) + Nil + } + } + + val IMServerSettings = { + //自定义保持活动数据有效负载 + val defaultSettings = ServerSettings(system) + val pingCounter = new AtomicInteger() + val IMWebsocketSettings = defaultSettings.websocketSettings. + withPeriodicKeepAliveData(() => ByteString(s"debug-ping-${pingCounter.incrementAndGet()}")) + defaultSettings.withWebsocketSettings(IMWebsocketSettings) + } + + //eg ws://127.0.0.1:8080/websocket?uid=1 + val IMRoute = { + path("websocket") { + get { + parameters("uid".as[String]) { uid => + println("当前有用户连接了 => [id = " + uid + "]") + handleWebSocketMessages(IMWebSocketService) + } + } + } + } + + def startUp() { + val bindingFuture = Http().bindAndHandle(IMRoute, host, port, settings = IMServerSettings) + StdIn.readLine() + bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate()) + } + + startUp() +} + + diff --git a/src/main/scala/cn/edu/layim/util/WebSocketUtil.scala b/src/main/scala/cn/edu/layim/util/WebSocketUtil.scala index bc5967b..79140af 100644 --- a/src/main/scala/cn/edu/layim/util/WebSocketUtil.scala +++ b/src/main/scala/cn/edu/layim/util/WebSocketUtil.scala @@ -1,7 +1,8 @@ package cn.edu.layim.util +import java.util +import java.util.List import java.util.concurrent.ConcurrentHashMap -import java.util.{ HashMap, List } import cn.edu.layim.Application import cn.edu.layim.constant.SystemConstant @@ -15,7 +16,7 @@ import javax.websocket.Session import org.slf4j.{ Logger, LoggerFactory } import scala.beans.BeanProperty -import scala.collection.JavaConversions +import scala.collection.JavaConverters._ /** @@ -65,7 +66,7 @@ object WebSocketUtil { //找到群组id里面的所有用户 val users: List[User] = userService.findUserByGroupId(gid) //过滤掉本身的uid - JavaConversions.collectionAsScalaIterable(users).filter(_.id != message.getMine.getId) + users.asScala.filter(_.id != message.getMine.getId) .foreach { user => { //是否在线 if (WebSocketUtil.getSessions.containsKey(user.getId)) { @@ -110,7 +111,7 @@ object WebSocketUtil { */ def removeFriend(uId: Integer, friendId: Integer) = synchronized { //对方是否在线,在线则处理,不在线则不处理 - val result = new HashMap[String, String] + val result = new util.HashMap[String, String] if (sessions.get(friendId) != null) { result.put("type", "delFriend"); result.put("uId", uId + ""); @@ -136,7 +137,7 @@ object WebSocketUtil { addMessage.setRemark(t.getRemark) addMessage.setType(1) userService.saveAddMessage(addMessage) - val result = new HashMap[String, String] + val result = new util.HashMap[String, String] if (sessions.get(to.getId) != null) { result.put("type", "addGroup"); sendMessage(gson.toJson(result), sessions.get(to.getId)) @@ -155,12 +156,12 @@ object WebSocketUtil { addMessage.setFromUid(mine.getId) addMessage.setTime(DateUtil.getDateTime) addMessage.setToUid(message.getTo.getId) - val add = gson.fromJson(message.getMsg(), classOf[Add]) + val add = gson.fromJson(message.getMsg, classOf[Add]) addMessage.setRemark(add.getRemark) addMessage.setType(add.getType) addMessage.setGroupId(add.getGroupId) userService.saveAddMessage(addMessage) - val result = new HashMap[String, String] + val result = new util.HashMap[String, String] //如果对方在线,则推送给对方 if (sessions.get(message.getTo.getId) != null) { result.put("type", "addFriend") @@ -174,10 +175,10 @@ object WebSocketUtil { * @param uid * @return HashMap[String, String] */ - def countUnHandMessage(uid: Integer): HashMap[String, String] = synchronized { + def countUnHandMessage(uid: Integer): util.HashMap[String, String] = synchronized { val count = userService.countUnHandMessage(uid, 0) LOGGER.info("count = " + count) - val result = new HashMap[String, String] + val result = new util.HashMap[String, String] result.put("type", "unHandMessage") result.put("count", count + "") result @@ -189,10 +190,10 @@ object WebSocketUtil { * @param message * @return HashMap[String, String] */ - def checkOnline(message: Message, session: Session): HashMap[String, String] = synchronized { + def checkOnline(message: Message, session: Session): util.HashMap[String, String] = synchronized { LOGGER.info("监测在线状态" + message.getTo.toString) val uids = redisService.getSets(SystemConstant.ONLINE_USER) - val result = new HashMap[String, String] + val result = new util.HashMap[String, String] result.put("type", "checkOnline") if (uids.contains(message.getTo.getId.toString)) result.put("status", "在线") @@ -208,7 +209,7 @@ object WebSocketUtil { * @param session */ def sendMessage(message: String, session: Session): Unit = synchronized { - session.getBasicRemote().sendText(message) + session.getBasicRemote.sendText(message) } /** -- GitLab