纯netty,没有websocket
立即下载
资源介绍:
纯netty,没有websocket
package io.renren.modules.test.netty;
import cn.hutool.json.JSONUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.json.JSONObject;
/**
* 自定义的Handler需要继承Netty规定好的HandlerAdapter
* 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
* ChannelInboundHandlerAdapter(入站处理器)
* ChannelOutboundHandler(出站处理器)
*
* ChannelInboundHandlerAdapter处理器常用的事件有:
* 注册事件 fireChannelRegistered。
* 连接建立事件 fireChannelActive。
* 读事件和读完成事件 fireChannelRead、ChannelReadComplete。
* 异常通知事件 fireExceptionCaught。
* 用户自定义事件 fireEventTriggered。
* Channel 可写状态变化事件 fireChannelWritabilityChanged。
* 连接关闭事件 fireChannelInactive。
* 都有对对应的
*
*ChannelOutboundHandler处理器常用的事件有:
* 端口绑定 bind。
* 连接服务端 connect。
* 写事件 write。
* 刷新时间 flush。
* 读事件 read。
* 主动断开连接 disconnect。
* 关闭 channel 事件 close。
*
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = JSONUtil.toJsonStr(msg);
System.out.println("发送过来的原始msg:"+s);
ByteBuf byteBuf = (ByteBuf) msg;
try {
// 将接收到的ByteBuf转换为字符串
String messageStr = byteBuf.toString(CharsetUtil.UTF_8);
// 解析接收到的JSON消息
JSONObject jsonMsg = new JSONObject(messageStr);
// 获取消息中的各个字段
String msgId = jsonMsg.getString("msgId");
int msgType = jsonMsg.getInt("msgType");
String fromId = jsonMsg.getString("fromId");
long timestamp = jsonMsg.getLong("timestamp");
JSONObject extend = jsonMsg.getJSONObject("extend");
// 处理消息,根据msgType执行不同的业务逻辑
switch (msgType) {
case 1001:
handleMsgType1001(ctx, msgId, fromId, timestamp, extend);
break;
// 可以添加其他类型的处理逻辑
default:
handleUnknownMessageType(ctx, msgType, msgId);
break;
}
} catch (Exception e) {
// 消息格式错误处理
System.err.println("接收到的消息格式错误:" + e.getMessage());
} finally {
// 释放ByteBuf资源
byteBuf.release();
}
}
private void handleMsgType1001(ChannelHandlerContext ctx, String msgId, String fromId, long timestamp, JSONObject extend) {
// 处理消息类型为1001的业务逻辑
String token = extend.getString("token");
// 可以根据具体业务逻辑进一步处理
System.out.println("处理消息类型为1001的业务逻辑,token=" + token);
// 假设服务端回复一个确认消息给客户端
JSONObject response = new JSONObject();
response.put("msgId", msgId);
response.put("type", "ack");
response.put("timestamp", System.currentTimeMillis());
ctx.writeAndFlush(Unpooled.copiedBuffer(response.toString(), CharsetUtil.UTF_8));
}
private void handleUnknownMessageType(ChannelHandlerContext ctx, int msgType, String msgId) {
// 处理未知类型的消息
System.out.println("收到未知类型的消息,msgType=" + msgType + ", msgId=" + msgId);
// 可以根据具体情况进行处理,例如记录日志或发送错误消息给客户端
}
// 处理数据消息
private void handleDataMessage(ChannelHandlerContext ctx, JSONObject jsonMsg) {
// 处理数据逻辑,根据具体业务需求处理数据
}
// 处理未知类型的消息
private void handleUnknownMessage(ChannelHandlerContext ctx, JSONObject jsonMsg) {
// 处理未知类型的消息,可以记录日志或者发送错误响应给客户端
}
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//// //获取客户端发送过来的消息
// ByteBuf byteBuf = (ByteBuf) msg;
// System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
// //获取到线程池eventLoop,添加线程,执行
//// ctx.channel().eventLoop().execute(new Runnable() {
//// @Override
//// public void run() {
//// try {
//// //长时间操作,不至于长时间的业务操作导致Handler阻塞
//// Thread.sleep(1000);
//// ByteBuf byteBuf = (ByteBuf) msg;
//// System.out.println("长时间的业务处理:"+"发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
//// } catch (Exception e) {
//// e.printStackTrace();
//// }
//// }
//// });
//
//
// }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,并给你发送一个消息:哈哈。。。", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
ctx.close();
}
}