柚子快報激活碼778899分享:WebSocket 協(xié)議介紹
柚子快報激活碼778899分享:WebSocket 協(xié)議介紹
前言
一.通用協(xié)議設計
參考鏈接
/*
+---------------------------------------------------------------+
| 魔數(shù) 2byte | 協(xié)議版本號 1byte | 序列化算法 1byte | 報文類型 1byte |
+---------------------------------------------------------------+
| 狀態(tài) 1byte | 保留字段 4byte | 數(shù)據(jù)長度 4byte |
+---------------------------------------------------------------+
| 數(shù)據(jù)內(nèi)容 (長度不定) | 校驗字段 2byte |
+---------------------------------------------------------------+
*/
正文
二.WebSocket 協(xié)議
2.1 基礎數(shù)據(jù)幀
術語說明大小FIN如果是 1,表示這是消息(message)的最后一個分片(fragment);如果是 0,表示不是是消息(message)的最后一個分片(fragment)1bitRSV1, RSV2, RSV3一般情況下全為 0。當客戶端、服務端協(xié)商采用 WebSocket 擴展時,這三個標志位可以非 0,且值的含義由擴展進行定義。如果出現(xiàn)非零的值,且并沒有采用 WebSocket 擴展,連接出錯每個1 bitOpcode操作代碼,Opcode 的值決定了應該如何解析后續(xù)的數(shù)據(jù)載荷(data payload)。如果操作代碼是不認識的,那么接收端應該斷開連接(fail the connection)4 bitsMask表示是否要對數(shù)據(jù)載荷進行掩碼操作。從客戶端向服務端發(fā)送數(shù)據(jù)時,需要對數(shù)據(jù)進行掩碼操作;從服務端向客戶端發(fā)送數(shù)據(jù)時,不需要對數(shù)據(jù)進行掩碼操作。如果服務端接收到的數(shù)據(jù)沒有進行過掩碼操作,服務端需要斷開連接。如果 Mask 是 1,那么在 Masking-key 中會定義一個掩碼鍵(masking key),并用這個掩碼鍵來對數(shù)據(jù)載荷進行反掩碼。所有客戶端發(fā)送到服務端的數(shù)據(jù)幀,Mask 都是 1。1bitPayload length數(shù)據(jù)載荷的長度,單位是字節(jié)。假設數(shù) Payload length === x,如果:x 為 0~126:數(shù)據(jù)的長度為 x 字節(jié)。x 為 126:后續(xù) 2 個字節(jié)代表一個 16 位的無符號整數(shù),該無符號整數(shù)的值為數(shù)據(jù)的長度。x 為 127:后續(xù) 8 個字節(jié)代表一個 64 位的無符號整數(shù)(最高位為 0),該無符號整數(shù)的值為數(shù)據(jù)的長度。此外,如果 payload length 占用了多個字節(jié)的話,payload length 的二進制表達采用網(wǎng)絡序(big endian,重要的位在前)。7 bits, 7+16 bits, 或者 7+64 bitsMasking-key所有從客戶端傳送到服務端的數(shù)據(jù)幀,數(shù)據(jù)載荷都進行了掩碼操作,Mask 為 1,且攜帶了 4 字節(jié)的 Masking-key。如果 Mask 為 0,則沒有 Masking-key。備注:載荷數(shù)據(jù)的長度,不包括 mask key 的長度。0 or 4 bytesPayload data“負載數(shù)據(jù)”定義為“擴展數(shù)據(jù)”連接“應用數(shù)據(jù)”。-Extension data: x byte “擴展數(shù)據(jù)”是 0 字節(jié)除非已經(jīng)協(xié)商了一個擴展。任何擴展必須指定“擴展數(shù)據(jù)”的長度,或長度是如何計算的,以及擴展如何使用必須在打開階段握手期間協(xié)商。如果存在,“擴展數(shù)據(jù)”包含在總負載長度中。- Application data: y bytes任意的“應用數(shù)據(jù)”,占用“擴展數(shù)據(jù)”之后幀的剩余部分。“應用數(shù)據(jù)”的長度等于負載長度減去“擴展數(shù)據(jù)”長度。(x+y) bytes
Opcode:
%x0 代表一個繼續(xù)幀%x1 代表一個文本幀%x2 代表一個二進制幀%x3-7 保留用于未來的非控制幀%x8 代表連接關閉%x9 代表 ping%xA 代表 pong%xB-F 保留用于未來的控制幀
2.2 數(shù)據(jù)幀另外一種表達方式
ws-frame = frame-fin ; 1 bit in length
frame-rsv1 ; 1 bit in length
frame-rsv2 ; 1 bit in length
frame-rsv3 ; 1 bit in length
frame-opcode ; 4 bits in length
frame-masked ; 1 bit in length
frame-payload-length ; either 7, 7+16,
; or 7+64 bits in
; length
[ frame-masking-key ] ; 32 bits in length
frame-payload-data ; n*8 bits in
; length, where
; n >= 0
frame-fin = %x0 ; more frames of this message follow
/ %x1 ; final frame of this message
; 1 bit in length
frame-rsv1 = %x0 / %x1
; 1 bit in length, MUST be 0 unless
; negotiated otherwise
frame-rsv2 = %x0 / %x1
; 1 bit in length, MUST be 0 unless
; negotiated otherwise
frame-rsv3 = %x0 / %x1
; 1 bit in length, MUST be 0 unless
; negotiated otherwise
frame-opcode = frame-opcode-non-control /
frame-opcode-control /
frame-opcode-cont
frame-opcode-cont = %x0 ; frame continuation
frame-opcode-non-control= %x1 ; text frame
/ %x2 ; binary frame
/ %x3-7
; 4 bits in length,
; reserved for further non-control frames
frame-opcode-control = %x8 ; connection close
/ %x9 ; ping
/ %xA ; pong
/ %xB-F ; reserved for further control
; frames
; 4 bits in length
frame-masked = %x0
; frame is not masked, no frame-masking-key
/ %x1
; frame is masked, frame-masking-key present
; 1 bit in length
frame-payload-length = ( %x00-7D )
/ ( %x7E frame-payload-length-16 )
/ ( %x7F frame-payload-length-63 )
; 7, 7+16, or 7+64 bits in length,
; respectively
frame-payload-length-16 = %x0000-FFFF ; 16 bits in length
frame-payload-length-63 = %x0000000000000000-7FFFFFFFFFFFFFFF
; 64 bits in length
frame-masking-key = 4( %x00-FF )
; present only if frame-masked is 1
; 32 bits in length
frame-payload-data = (frame-masked-extension-data
frame-masked-application-data)
; when frame-masked is 1
/ (frame-unmasked-extension-data
frame-unmasked-application-data)
; when frame-masked is 0
frame-masked-extension-data = *( %x00-FF )
; reserved for future extensibility
; n*8 bits in length, where n >= 0
frame-masked-application-data = *( %x00-FF )
; n*8 bits in length, where n >= 0
frame-unmasked-extension-data = *( %x00-FF )
; reserved for future extensibility
; n*8 bits in length, where n >= 0
frame-unmasked-application-data = *( %x00-FF )
; n*8 bits in length, where n >= 0
2.3 WebSocket掩碼的作用
WebSocket的掩碼算法是一種數(shù)據(jù)加密方法,?用于保護數(shù)據(jù)傳輸?shù)陌踩浴?這種算法通過異或運算對數(shù)據(jù)進行處理,?以防止早期版本的協(xié)議中存在的代理緩存污染攻擊等問題。?具體來說,?掩碼算法的實現(xiàn)過程如下:?
掩碼的作用:?掩碼算法并不是為了防止數(shù)據(jù)泄密,?而是為了防止代理緩存污染攻擊等問題。?它通過對數(shù)據(jù)進行異或運算,?使得原始數(shù)據(jù)在傳輸過程中被改變,?只有在接收端使用相同的掩碼進行反向操作,?才能還原出原始數(shù)據(jù)。? 算法描述:?對于每個需要發(fā)送的字節(jié),?它通過與掩碼密鑰進行異或運算來生成傳輸?shù)臄?shù)據(jù)。?具體來說,?對于原始數(shù)據(jù)中的每個字節(jié)original-octet-i,?它首先計算j = i MOD 4來獲取掩碼密鑰中的對應字節(jié)masking-key-octet-j,為mask key第j個字節(jié)。?然后,?將original-octet-i與masking-key-octet-j進行異或運算,?得到的結果即為傳輸?shù)臄?shù)據(jù)即:j = i MOD 4 transformed-octet-i = original-octet-i XOR masking-key-octet-j。?
void umask(char *payload, int len, char *mask)
{
int i = 0;
for (i = 0; i < len; i++)
{
payload[i] ^= mask[i % 4];
}
}
示例:?以客戶端發(fā)送語音文件到服務端的場景為例,?客戶端首先發(fā)送txt消息文件名稱,?然后再發(fā)送bin消息二進制流數(shù)據(jù)。?在這個過程中,?客戶端對需要發(fā)送的字符與掩碼進行異或運算,?生成用于網(wǎng)絡傳輸?shù)臄?shù)據(jù)。?例如,?字符’t’的ASCII值為116,?與掩碼14進行異或運算后,?得到的結果用于傳輸。?
待補充
2.4 分片(Fragmentation)
2.5 控制幀
Close:0x8Ping:0x9Pong:0xA操作碼 0xB-0xF 保留用于未來尚未定義的控制幀。 -備注:控制幀用于傳達有關WebSocket的狀態(tài)??刂茙梢圆迦氲椒制⒌闹虚g。 所有控制幀必須有一個125字節(jié)的負載長度或更少, 必須不被分段。
2.5.1 Close
Close幀包含 0x8 操作碼,Close幀可以包含數(shù)據(jù),閉幀可以包含內(nèi)容體(“幀的“應用數(shù)據(jù)”部分)指示一個關閉的原因。如果客戶端和服務器同時都發(fā)送了一個關閉消息,兩個端點都將發(fā)送和接收一個關閉消息且應該認為WebSocket連接關閉了并關閉底層TCP連接。如果有內(nèi)容體,內(nèi)容體的頭兩個字節(jié)必須是2字節(jié)的無符號整數(shù)(按網(wǎng)絡字節(jié)順序),這句話理解如下代碼。netty中有內(nèi)容體的“CloseWebSocketFrame”操作示例:
CloseWebSocketFrame closeFrame = new CloseWebSocketFrame(closeStatus.code(), reasonText);
內(nèi)層操作原理
private static ByteBuf newBinaryData(int statusCode, String reasonText) {
if (reasonText == null) {
reasonText = StringUtil.EMPTY_STRING;
}
//空出兩字節(jié),用于寫狀態(tài)
ByteBuf binaryData = Unpooled.buffer(2 + reasonText.length());
binaryData.writeShort(statusCode);
if (!reasonText.isEmpty()) {
binaryData.writeCharSequence(reasonText, CharsetUtil.UTF_8);
}
return binaryData;
}
其中WebSocket關閉幀已經(jīng)有預定義的狀態(tài)碼,可查看RFC6455(點擊查看中文)文檔;netty的“WebSocketCloseStatus.java”中定義的狀態(tài)碼與RFC6455文檔一致。
2.5.2 Ping
Ping 幀包含 0x9 操作碼,Ping幀可以包含數(shù)據(jù) 注意:一個 Ping 即可以充當一個 keepalive,也可以作為驗證遠程端點仍可響應的手段。netty中Ping若包含數(shù)據(jù),但是ChannelPipeline中如果添加了netty的handler“WebSocketProtocolHandler”,該Ping消息會在decode方法中被處理并且返回。 Ping包含數(shù)據(jù)在netty的寫法之一
//內(nèi)容可以多種類型
WebSocketFrame pingFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer("Hello!??!".getBytes(CharsetUtil.UTF_8)));
//pingFrame.content();內(nèi)容數(shù)據(jù)
netty的WebSocketProtocolHandler中的方法decode
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List
if (frame instanceof PingWebSocketFrame) {
frame.content().retain();//調(diào)用 retain 方法計數(shù)加 1,表示調(diào)用者沒用完之前,其它 handler 即使調(diào)用了 release 也不會造成回收
ctx.writeAndFlush(new PongWebSocketFrame(frame.content()));
readIfNeeded(ctx);
return;
}
if (frame instanceof PongWebSocketFrame && dropPongFrames) {
readIfNeeded(ctx);
return;
}
out.add(frame.retain());
}
2.5.3 Pong
Pong幀包含 0xA 操作碼,Pong幀可以包含數(shù)據(jù)一個Pong 幀在響應中發(fā)送到一個Ping幀必須有在將回復的Ping幀的消息內(nèi)容體中發(fā)現(xiàn)的相同的“應用數(shù)據(jù)”。如果端點接收到一個Ping幀且尚未在響應中發(fā)送Pong幀到之前的Ping幀,端點可以選擇僅為最近處理的Ping幀發(fā)送一個Pong幀
2.6 數(shù)據(jù)幀(payload)
數(shù)據(jù)幀操作碼最高位是0的操作碼標識, 0x3-0x7 保留用于未來尚未定義的數(shù)據(jù)幀,當前為數(shù)據(jù)幀定義的操作碼有:
文本幀:0x1二進制:0x2
三、netty中websocket協(xié)議的使用
1.簡單示例
(1)服務端
package io.netty.example.test.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
private static ServerBootstrap bootstrap = null;
private static EventLoopGroup bossGroup = null;
private static EventLoopGroup workGroup = null;
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
boolean listenResuset = false;
try {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup(1);
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 1024 * 1024))
.childHandler(new ChannelInitializer
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("idle", new IdleStateHandler(20000, 20000, 120000, TimeUnit.MILLISECONDS));
pipeline.addLast("IdleState", new SocketIdleStateTrigger());
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("log-handler", new LoggingHandler(LogLevel.TRACE));
pipeline.addLast("websocket-compression", new WebSocketServerCompressionHandler());//websocket數(shù)據(jù)壓縮
pipeline.addLast("protocol-handler", new WebSocketServerProtocolHandler("/ws", null, true, 1024 * 1024, true));
pipeline.addLast("websocket-handler", new WebSocketFrameHandler());
}
});
listenResuset = bootstrap.bind(9080).await(5000, TimeUnit.MICROSECONDS);
} catch (Exception e) {
log.error("start error.", e);
} finally {
}
}
@ChannelHandler.Sharable
static class WebSocketFrameHandler extends SimpleChannelInboundHandler
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
log.info("channelRead0 msg class:{}", msg.getClass());
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
String text = frame.text();
log.info("channelRead0 rev msg:{}", text);
if ("A".equals(text)) {
PingWebSocketFrame pingWebSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer("PINGPINGPING".getBytes(CharsetUtil.UTF_8)));
ctx.channel().writeAndFlush(pingWebSocketFrame);
}
} else if (msg instanceof PingWebSocketFrame) {
// 處理Ping消息
PingWebSocketFrame pingFrame = (PingWebSocketFrame) msg;
// 獲取消息內(nèi)容
String content = pingFrame.content().toString(CharsetUtil.UTF_8);
log.info("Received Ping with message:{}", content);
}
}
}
@ChannelHandler.Sharable
static class SocketIdleStateTrigger extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
log.info("WRITER_IDLE channelId:{}", ctx.channel().id().asLongText());
} else if (state == IdleState.READER_IDLE) {
log.info("READER_IDLE channelId:{}", ctx.channel().id().asLongText());
} else if (state == IdleState.ALL_IDLE) {
// 太長時間無收發(fā)消息,一般要做斷開連接
log.info("ALL_IDLE channelId:{}", ctx.channel().id().asLongText());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
}
(2)客戶端
package io.netty.example.test.websocket;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.CharsetUtil;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
public class WebSocketClient {
static final String URL = System.getProperty("url", "ws://127.0.0.1:9080/ws");
public static void main(String[] args) throws Exception {
URI uri = new URI(URL);
String scheme = uri.getScheme() == null? "ws" : uri.getScheme();
final String host = uri.getHost() == null? "127.0.0.1" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
}
EventLoopGroup group = new NioEventLoopGroup();
try {
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
final WebSocketClientHandler handler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
handler);
}
});
Channel ch = b.connect(host, port).sync().channel();
handler.handshakeFuture().sync();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equals(msg.toLowerCase())) {
//WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("Hello!!!".getBytes(CharsetUtil.UTF_8))));
ch.writeAndFlush(frame);
} else if ("test-ping".equals(msg.toLowerCase())) {
PingWebSocketFrame pingWebSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer("PINGPINGPING".getBytes(CharsetUtil.UTF_8)));
ch.writeAndFlush(pingWebSocketFrame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
static class WebSocketClientHandler extends SimpleChannelInboundHandler
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("WebSocket Client disconnected!");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong: " + frame.content().toString(CharsetUtil.UTF_8));
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
} else if (frame instanceof PingWebSocketFrame) {
PingWebSocketFrame pingFrame = (PingWebSocketFrame) frame;
ByteBuf content = pingFrame.content();
System.out.println("WebSocket Client received ping:" + content.toString(CharsetUtil.UTF_8));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
}
2.封裝信令示例
netty websocket使用
附錄
參考資料
RFC6455netty源碼自定義協(xié)議通信
柚子快報激活碼778899分享:WebSocket 協(xié)議介紹
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權,聯(lián)系刪除。