提交 ed69187a 编写于 作者: 梦境迷离's avatar 梦境迷离

add websocket server example

上级 3ac0756a
......@@ -35,7 +35,8 @@ jar {
}
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
// maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
jcenter()
mavenCentral()
}
......@@ -88,6 +89,7 @@ dependencies {
compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.23'
compile group: 'com.typesafe.akka', name: 'akka-http_2.12', version: '10.1.10'
compile group: 'com.typesafe.akka', name: 'akka-slf4j_2.12', version: '2.5.23'
compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.23'
//test
testCompile("org.springframework.boot:spring-boot-starter-test")
......
akka {
actor.default-dispatcher.fork-join-executor.parallelism-max = 64
loggers = ["akka.event.slf4j.Slf4jLogger"]
actor-system = "custom-application"
actor.debug.receive = on
loglevel = "debug"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
http.server.websocket.periodic-keep-alive-max-idle = 1 second
http.server.websocket.periodic-keep-alive-mode = pong #客户端将不会回复此类心跳
}
\ No newline at end of file
package cn.edu.layim.server
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString
import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.StdIn
/**
* akka-http websocket server
*
* @author liguobin@growingio.com
* @version 1.0,2020/1/22
*/
object WebsocketServer extends App {
println(
"""
| __ __ ___. _________ __ __ _________
|/ \ / \ ____\_ |__ / _____/ ____ ____ | | __ _____/ |_ / _____/ ______________ __ ___________
|\ \/\/ // __ \| __ \ \_____ \ / _ \_/ ___\| |/ // __ \ __\ \_____ \_/ __ \_ __ \ \/ // __ \_ __ \
| \ /\ ___/| \_\ \/ ( <_> ) \___| <\ ___/| | / \ ___/| | \/\ /\ ___/| | \/
| \__/\ / \___ >___ /_______ /\____/ \___ >__|_ \\___ >__| /_______ /\___ >__| \_/ \___ >__|
| \/ \/ \/ \/ \/ \/ \/ \/ \/ \/
|""".stripMargin)
import Directives._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val host = "localhost"
val port = 8080
val IMWebSocketService = {
Flow[Message].mapConcat {
case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
val IMServerSettings = {
//自定义保持活动数据有效负载
val defaultSettings = ServerSettings(system)
val pingCounter = new AtomicInteger()
val IMWebsocketSettings = defaultSettings.websocketSettings.
withPeriodicKeepAliveData(() => ByteString(s"debug-ping-${pingCounter.incrementAndGet()}"))
defaultSettings.withWebsocketSettings(IMWebsocketSettings)
}
//eg ws://127.0.0.1:8080/websocket?uid=1
val IMRoute = {
path("websocket") {
get {
parameters("uid".as[String]) { uid =>
println("当前有用户连接了 => [id = " + uid + "]")
handleWebSocketMessages(IMWebSocketService)
}
}
}
}
def startUp() {
val bindingFuture = Http().bindAndHandle(IMRoute, host, port, settings = IMServerSettings)
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
startUp()
}
package cn.edu.layim.util
import java.util
import java.util.List
import java.util.concurrent.ConcurrentHashMap
import java.util.{ HashMap, List }
import cn.edu.layim.Application
import cn.edu.layim.constant.SystemConstant
......@@ -15,7 +16,7 @@ import javax.websocket.Session
import org.slf4j.{ Logger, LoggerFactory }
import scala.beans.BeanProperty
import scala.collection.JavaConversions
import scala.collection.JavaConverters._
/**
......@@ -65,7 +66,7 @@ object WebSocketUtil {
//找到群组id里面的所有用户
val users: List[User] = userService.findUserByGroupId(gid)
//过滤掉本身的uid
JavaConversions.collectionAsScalaIterable(users).filter(_.id != message.getMine.getId)
users.asScala.filter(_.id != message.getMine.getId)
.foreach { user => {
//是否在线
if (WebSocketUtil.getSessions.containsKey(user.getId)) {
......@@ -110,7 +111,7 @@ object WebSocketUtil {
*/
def removeFriend(uId: Integer, friendId: Integer) = synchronized {
//对方是否在线,在线则处理,不在线则不处理
val result = new HashMap[String, String]
val result = new util.HashMap[String, String]
if (sessions.get(friendId) != null) {
result.put("type", "delFriend");
result.put("uId", uId + "");
......@@ -136,7 +137,7 @@ object WebSocketUtil {
addMessage.setRemark(t.getRemark)
addMessage.setType(1)
userService.saveAddMessage(addMessage)
val result = new HashMap[String, String]
val result = new util.HashMap[String, String]
if (sessions.get(to.getId) != null) {
result.put("type", "addGroup");
sendMessage(gson.toJson(result), sessions.get(to.getId))
......@@ -155,12 +156,12 @@ object WebSocketUtil {
addMessage.setFromUid(mine.getId)
addMessage.setTime(DateUtil.getDateTime)
addMessage.setToUid(message.getTo.getId)
val add = gson.fromJson(message.getMsg(), classOf[Add])
val add = gson.fromJson(message.getMsg, classOf[Add])
addMessage.setRemark(add.getRemark)
addMessage.setType(add.getType)
addMessage.setGroupId(add.getGroupId)
userService.saveAddMessage(addMessage)
val result = new HashMap[String, String]
val result = new util.HashMap[String, String]
//如果对方在线,则推送给对方
if (sessions.get(message.getTo.getId) != null) {
result.put("type", "addFriend")
......@@ -174,10 +175,10 @@ object WebSocketUtil {
* @param uid
* @return HashMap[String, String]
*/
def countUnHandMessage(uid: Integer): HashMap[String, String] = synchronized {
def countUnHandMessage(uid: Integer): util.HashMap[String, String] = synchronized {
val count = userService.countUnHandMessage(uid, 0)
LOGGER.info("count = " + count)
val result = new HashMap[String, String]
val result = new util.HashMap[String, String]
result.put("type", "unHandMessage")
result.put("count", count + "")
result
......@@ -189,10 +190,10 @@ object WebSocketUtil {
* @param message
* @return HashMap[String, String]
*/
def checkOnline(message: Message, session: Session): HashMap[String, String] = synchronized {
def checkOnline(message: Message, session: Session): util.HashMap[String, String] = synchronized {
LOGGER.info("监测在线状态" + message.getTo.toString)
val uids = redisService.getSets(SystemConstant.ONLINE_USER)
val result = new HashMap[String, String]
val result = new util.HashMap[String, String]
result.put("type", "checkOnline")
if (uids.contains(message.getTo.getId.toString))
result.put("status", "在线")
......@@ -208,7 +209,7 @@ object WebSocketUtil {
* @param session
*/
def sendMessage(message: String, session: Session): Unit = synchronized {
session.getBasicRemote().sendText(message)
session.getBasicRemote.sendText(message)
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册