在WebSocket中注入spring bean
和Listener一样,websocket不属于spring framework,不是spring framework天然的bean,因此无法在里面直接注入spring的bean。
SpringCongifurator
为了让websocket更好地在Spring framework中工作,WebSocket Endpoint的配置器将继承SpringCongifurator。我们需要在pom.xml中引入相关的jar包:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.framework.version}</version>
<scope>compile</scope>
</dependency>我们可以通过SpringCongifurator作为websocket endpoint的配置器
@ServerEndpoint(value = "/chat/{sessionId}",
configurator = SpringCongifurator.class)
public class ChatEndpoint {
... ...
}如果需要自定义配置器,具体见下面代码:
@ServerEndpoint(value = "/chat/{sessionId}",
encoders = ChatMessageCodec.class,
decoders = ChatMessageCodec.class,
configurator = ChatEndpoint.EndpointConfigurator.class)
public class ChatEndpoint {
@Inject SessionRegistryService sessionRegisterService;
... ...
// SpringConfigurator是继承了ServerEndpointConfig.Configurator(参见 WebSocket(5):encoder,decoder和configurator#configurator)
// 本例,我们将httpSession存放在websocket.Session的user Properties中,因此采用自定义的方式。为了便于获取,我们提供了两个静态方法。当然我们可以使用 webSocketSession.getUserProperties().get(key) 来获取
// 通过SpringConfigurator,我们就可以在ChatEndpoint中使用Spring的注入,如上面的SessionRegistryService,但ChatEndpoint并不属于Spring framework,不能作为Bean被注入。如果我们想将ChatEndpoint作为一个Bean,可以在RootContext的配置中
// @Bean
// public ChatEndpoint chatEndpoint(){
// return new ChatEndpoint();
// }
// 但是这样一来系统就只有一个ChatEndpoint管理所有的连接,而不是每个ChatEndpoint管理一条连接,处理起来要复杂很多。
public static class EndpointConfigurator extends SpringConfigurator{
private static final String HTTP_SESSION_KEY = "cn.wei.flowingflying.customer_support.http.session";
private static final String PRINCIPAL_KEY = "cn.wei.flowingflying.customer_support.user.principal";
@Override
public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
super.modifyHandshake(config, request, response);
HttpSession session = (HttpSession)request.getHttpSession();
config.getUserProperties().put(HTTP_SESSION_KEY, session);
config.getUserProperties().put(PRINCIPAL_KEY, UserPrincipal.getPrincipal(session));
}
private static HttpSession getExposedSession(Session session){
return (HttpSession) session.getUserProperties().get(HTTP_SESSION_KEY);
}
private static Principal getExposedPrincipal(Session session){
return (Principal)session.getUserProperties().get(PRINCIPAL_KEY);
}
}
}在websocket中使用Scheduler
使用websocket,一般情况下,一个连接对应要给websocket实例,也就是我们不采用在root context中配置websocket为bean,这会导致所有的连接都对应到同一个websocket实例中。也就是websocket实例不作为singleton bean。而之前学习的@Scheduled必须加载singleton bean上,我们需要另辟方法。
下面的例子我们在websocket中通过taskSechdule启动ping-pong机制,在chat结束时终止。在例子中我们接上一学习的例子,演示了在http session删除时,结束chat,重点看看如何使用consumer。
@ServerEndpoint(value = "/chat/{sessionId}",
encoders = ChatMessageCodec.class,
decoders = ChatMessageCodec.class,
configurator = ChatEndpoint.EndpointConfigurator.class)
public class ChatEndpoint {
private static final byte[] pongData = "This is PONG country.".getBytes(StandardCharsets.UTF_8);
private ScheduledFuture<?> pingFuture;
@Inject SessionRegistryService sessionRegisterService;
//我们不能直接在sendPing()前加入@Scheduled,pring的@Schedule只支持singleton bean,而ChatEndpoint不是,我们没有采用在root Context中将其设置为bean,而是每条连接都有一个实例。因此,我们将直接使用TaskScheduler。
@Inject TaskScheduler taskScheduler;
// 在sessionRegisterService的登记和注销httpSession删除的的回调函数,如果我们不将其赋给一个对象,Method Reference中this::httpSessionRemoved每次的值并不一样,要确保注册和注销中是同一个回调函数,需要将其固化。
private final Consumer<HttpSession> callback = this::httpSessionRemoved;
/* 用于在注入后执行。对于支持注入的类都会支持@PostConstruct,不仅是spring,WebSocket的ServerEndpoint支持。在接收到websocket client的连接请求的时候,server endpoint对象被创建,然后执行@postConstruct */
@PostConstruct
public void initialize(){
this.sessionRegisterService.registerOnRemoveCallback(this.callback);
this.pingFuture = this.taskScheduler.scheduleWithFixedDelay(
this::sendPing,
new Date(System.currentTimeMillis() + 25_000L), // start time
25_000L); //delay
}
// httpSession删除时的处理回调函数。
private void httpSessionRemoved(HttpSession httpSession){
if(httpSession == this.httpSession){
synchronized(this){
if(this.closed)
return;
log.info("Chat session ended abruptly by {} logging out.", this.principal.getName());
this.close(ChatService.ReasonForLeaving.LOGGED_OUT, null);
}
}
}
//具体处理chat关闭
private void close(ChatService.ReasonForLeaving reason, String unexpected){
this.closed = true;
if(this.pingFuture.isCancelled())
this.pingFuture.cancel(true);
this.sessionRegisterService.deregisterOnRemoveCallback(this.callback);
}
private void sendPing(){//发送ping
... ...
this.wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(ChatEndpoint.pongData));
}
@OnMessage
public void onPong(PongMessage message){ //收到pong
ByteBuffer data = message.getApplicationData();
if(!Arrays.equals(ChatEndpoint.pongData, data.array()))
log.warn("Received pong message with incorrect payload.");
else
log.debug("Received good pong message.");
}
... ...
}









