案例分析:设计模式与代码的结构特性
时间:2019-12-07 17:28:38
收藏:0
阅读:95
一、案例简介
Java基于多线程和NIO实现聊天室
-
涉及到的技术点
- 线程池ThreadPoolExecutor
- 阻塞队列BlockingQueue,生产者消费者模式
- Selector
- Channel
- ByteBuffer
- ProtoStuff 高性能序列化
- HttpClient连接池
- Spring依赖注入
- lombok简化POJO开发
- 原子变量
- 内置锁
- CompletionService
- log4j+slf4j日志
-
实现的功能
- 登录注销
- 单聊
- 群聊
- 客户端提交任务,下载图片并显示
- 上线下线公告
- 在线用户记录
- 批量下载豆瓣电影的图片,并打为压缩包传输给客户端
-
客户端使用方式:
- 登录:默认用户名是user1-user5,密码分别是pwd1-pwd5
- 例:打开客户端后输入用户名为user1,密码为pwd1
- 注销:关闭客户端即可
- 单聊:@username:message
- 例:@user2:hello
- 群聊:message
- 例:hello,everyone
- 提交任务:task.file:图片的URL / task.crawl_image:豆瓣电影的id[?imageSize=n] 可以加请求参数
- 例1:task.file:https://img1.doubanio.com/view/movie_poster_cover/lpst/public/p2107289058.webp 下载完毕后会弹出一个框,输入想将其保存到的路径,比如E:/img.webp
- 例2:task.crawl_image:1292371?imageSize=2 下载完毕后在弹出的框中输入E:/images.zip
- 登录:默认用户名是user1-user5,密码分别是pwd1-pwd5
二、具体实现
1、ChatServer.java:服务器端主程序代码文件;
public class ChatServer {
public static final int DEFAULT_BUFFER_SIZE = 1024;
public static final int PORT = 9000;
public static final String QUIT = "QUIT";
private AtomicInteger onlineUsers;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private ExecutorService readPool;
private BlockingQueue<Task> downloadTaskQueue;
private TaskManagerThread taskManagerThread;
private ListenerThread listenerThread;
private InterruptedExceptionHandler exceptionHandler;
public ChatServer() {
log.info("服务器启动");
initServer();
}
private void initServer() {
try {
serverSocketChannel = ServerSocketChannel.open();
//切换为非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(PORT));
//获得选择器
selector = Selector.open();
//将channel注册到selector上
//第二个参数是选择键,用于说明selector监控channel的状态
//可能的取值:SelectionKey.OP_READ OP_WRITE OP_CONNECT OP_ACCEPT
//监控的是channel的接收状态
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
this.readPool = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
this.downloadTaskQueue = new ArrayBlockingQueue<>(20);
this.taskManagerThread = new TaskManagerThread(downloadTaskQueue);
this.taskManagerThread.setUncaughtExceptionHandler(SpringContextUtil.getBean("taskExceptionHandler"));
this.listenerThread = new ListenerThread();
this.onlineUsers = new AtomicInteger(0);
this.exceptionHandler = SpringContextUtil.getBean("interruptedExceptionHandler");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 启动方法,线程最好不要在构造函数中启动,应该作为一个单独方法,或者使用工厂方法来创建实例
* 避免构造未完成就使用成员变量
*/
public void launch() {
new Thread(listenerThread).start();
new Thread(taskManagerThread).start();
}
/**
* 推荐的结束线程的方式是使用中断
* 在while循环开始处检查是否中断,并提供一个方法来将自己中断
* 不要在外部将线程中断
* <p>
* 另外,如果要中断一个阻塞在某个地方的线程,最好是继承自Thread,先关闭所依赖的资源,再关闭当前线程
*/
private class ListenerThread extends Thread {
@Override
public void interrupt() {
try {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
} finally {
super.interrupt();
}
}
@Override
public void run() {
try {
//如果有一个及以上的客户端的数据准备就绪
while (!Thread.currentThread().isInterrupted()) {
//当注册的事件到达时,方法返回;否则,该方法会一直阻塞
selector.select();
//获取当前选择器中所有注册的监听事件
for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext(); ) {
SelectionKey key = it.next();
//删除已选的key,以防重复处理
it.remove();
//如果"接收"事件已就绪
if (key.isAcceptable()) {
//交由接收事件的处理器处理
handleAcceptRequest();
} else if (key.isReadable()) {
//如果"读取"事件已就绪
//取消可读触发标记,本次处理完后才打开读取事件标记
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
//交由读取事件的处理器处理
readPool.execute(new ReadEventHandler(key));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
Thread.currentThread().interrupt();
}
}
/**
* 关闭服务器
*/
public void shutdownServer() {
try {
taskManagerThread.shutdown();
listenerThread.shutdown();
readPool.shutdown();
serverSocketChannel.close();
System.exit(0);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处理客户端的连接请求
*/
private void handleAcceptRequest() {
try {
SocketChannel client = serverSocketChannel.accept();
// 接收的客户端也要切换为非阻塞模式
client.configureBlocking(false);
// 监控客户端的读操作是否就绪
client.register(selector, SelectionKey.OP_READ);
log.info("服务器连接客户端:{}",client);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处于线程池中的线程会随着线程池的shutdown方法而关闭
*/
private class ReadEventHandler implements Runnable {
private ByteBuffer buf;
private SocketChannel client;
private ByteArrayOutputStream baos;
private SelectionKey key;
public ReadEventHandler(SelectionKey key) {
this.key = key;
this.client = (SocketChannel) key.channel();
this.buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
this.baos = new ByteArrayOutputStream();
}
@Override
public void run() {
try {
int size;
while ((size = client.read(buf)) > 0) {
buf.flip();
baos.write(buf.array(), 0, size);
buf.clear();
}
if (size == -1) {
return;
}
log.info("读取完毕,继续监听");
//继续监听读取事件
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.selector().wakeup();
byte[] bytes = baos.toByteArray();
baos.close();
Message message = ProtoStuffUtil.deserialize(bytes, Message.class);
MessageHandler messageHandler = SpringContextUtil.getBean("MessageHandler", message.getHeader().getType().toString().toLowerCase());
try {
messageHandler.handle(message, selector, key, downloadTaskQueue, onlineUsers);
} catch (InterruptedException e) {
log.error("服务器线程被中断");
exceptionHandler.handle(client, message);
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
System.out.println("Initialing...");
ChatServer chatServer = new ChatServer();
chatServer.launch();
Scanner scanner = new Scanner(System.in, "UTF-8");
while (scanner.hasNext()) {
String next = scanner.next();
if (next.equalsIgnoreCase(QUIT)) {
System.out.println("服务器准备关闭");
chatServer.shutdownServer();
System.out.println("服务器已关闭");
}
}
}
}
2、ChatClient.java
public class ChatClient extends Frame {
public static final int DEFAULT_BUFFER_SIZE = 1024;
private Selector selector;
private SocketChannel clientChannel;
private ByteBuffer buf;
private TextField tfText;
private TextArea taContent;
private ReceiverHandler listener;
private String username;
private boolean isLogin = false;
private boolean isConnected = false;
private Charset charset = StandardCharsets.UTF_8;
public ChatClient(String name, int x, int y, int w, int h) {
super(name);
initFrame(x, y, w, h);
initNetWork();
}
/**
* 初始化窗体
*
* @param x
* @param y
* @param w
* @param h
*/
private void initFrame(int x, int y, int w, int h) {
this.tfText = new TextField();
this.taContent = new TextArea();
this.setBounds(x, y, w, h);
this.setLayout(new BorderLayout());
this.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent e) {
setVisible(false);
disConnect();
System.exit(0);
}
});
this.taContent.setEditable(false);
this.add(tfText, BorderLayout.SOUTH);
this.add(taContent, BorderLayout.NORTH);
this.tfText.addActionListener((actionEvent) -> {
String str = tfText.getText().trim();
tfText.setText("");
send(str);
});
this.pack();
this.setVisible(true);
}
/**
* 初始化网络模块
*/
private void initNetWork() {
try {
selector = Selector.open();
clientChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
//设置客户端为非阻塞模式
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
login();
isConnected = true;
} catch (ConnectException e) {
JOptionPane.showMessageDialog(this, "连接服务器失败");
} catch (IOException e) {
e.printStackTrace();
}
}
public void launch() {
this.listener = new ReceiverHandler();
new Thread(listener).start();
}
private void login() {
String username = JOptionPane.showInputDialog("请输入用户名");
String password = JOptionPane.showInputDialog("请输入密码");
Message message = new Message(
MessageHeader.builder()
.type(MessageType.LOGIN)
.sender(username)
.timestamp(System.currentTimeMillis())
.build(), password.getBytes(charset));
try {
clientChannel.write(ByteBuffer.wrap(ProtoStuffUtil.serialize(message)));
} catch (IOException e) {
e.printStackTrace();
}
this.username = username;
}
private void disConnect() {
try {
logout();
if (!isConnected) {
return;
}
listener.shutdown();
//如果发送消息后马上断开连接,那么消息可能无法送达
Thread.sleep(10);
clientChannel.socket().close();
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void logout() {
if (!isLogin) {
return;
}
System.out.println("客户端发送下线请求");
Message message = new Message(
MessageHeader.builder()
.type(MessageType.LOGOUT)
.sender(username)
.timestamp(System.currentTimeMillis())
.build(), null);
try {
clientChannel.write(ByteBuffer.wrap(ProtoStuffUtil.serialize(message)));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 发送信息,监听在回车键上
*
* @param content
*/
public void send(String content) {
if (!isLogin) {
JOptionPane.showMessageDialog(null, "尚未登录");
return;
}
try {
Message message;
//普通模式
if (content.startsWith("@")) {
String[] slices = content.split(":");
String receiver = slices[0].substring(1);
message = new Message(
MessageHeader.builder()
.type(MessageType.NORMAL)
.sender(username)
.receiver(receiver)
.timestamp(System.currentTimeMillis())
.build(), slices[1].getBytes(charset));
} else if (content.startsWith("task")) {
String info = content.substring(content.indexOf(‘.‘) + 1);
int split = info.indexOf(‘:‘);
TaskDescription taskDescription = new TaskDescription(TaskType.valueOf(info.substring(0,split).toUpperCase()), info.substring(split+1));
//处理不同的Task类型
message = new Message(
MessageHeader.builder()
.type(MessageType.TASK)
.sender(username)
.timestamp(System.currentTimeMillis())
.build(), ProtoStuffUtil.serialize(taskDescription));
} else {
//广播模式
message = new Message(
MessageHeader.builder()
.type(MessageType.BROADCAST)
.sender(username)
.timestamp(System.currentTimeMillis())
.build(), content.getBytes(charset));
}
System.out.println(message);
clientChannel.write(ByteBuffer.wrap(ProtoStuffUtil.serialize(message)));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 用于接收信息的线程
*/
private class ReceiverHandler implements Runnable {
private boolean connected = true;
public void shutdown() {
connected = false;
}
public void run() {
try {
while (connected) {
int size = 0;
selector.select();
for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext(); ) {
SelectionKey selectionKey = it.next();
it.remove();
if (selectionKey.isReadable()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((size = clientChannel.read(buf)) > 0) {
buf.flip();
baos.write(buf.array(), 0, size);
buf.clear();
}
byte[] bytes = baos.toByteArray();
baos.close();
Response response = ProtoStuffUtil.deserialize(bytes, Response.class);
handleResponse(response);
}
}
}
} catch (IOException e) {
JOptionPane.showMessageDialog(null, "服务器关闭,请重新尝试连接");
isLogin = false;
}
}
private void handleResponse(Response response) {
System.out.println(response);
ResponseHeader header = response.getHeader();
switch (header.getType()) {
case PROMPT:
if (header.getResponseCode() != null) {
ResponseCode code = ResponseCode.fromCode(header.getResponseCode());
if (code == ResponseCode.LOGIN_SUCCESS) {
isLogin = true;
System.out.println("登录成功");
} else if (code == ResponseCode.LOGOUT_SUCCESS) {
System.out.println("下线成功");
break;
}
}
String info = new String(response.getBody(), charset);
JOptionPane.showMessageDialog(ChatClient.this, info);
break;
case NORMAL:
String content = formatMessage(taContent.getText(), response);
taContent.setText(content);
taContent.setCaretPosition(content.length());
break;
case FILE:
try {
String path = JOptionPane.showInputDialog("请输入保存的文件路径");
byte[] buf = response.getBody();
FileUtil.save(path, buf);
if(path.endsWith("jpg")){
//显示该图片
new PictureDialog(ChatClient.this, "图片", false, path);
}
} catch (IOException e) {
e.printStackTrace();
}
default:
break;
}
}
private String formatMessage(String originalText, Response response) {
ResponseHeader header = response.getHeader();
StringBuilder sb = new StringBuilder();
sb.append(originalText)
.append(header.getSender())
.append(": ")
.append(new String(response.getBody(), charset))
.append(" ")
.append(DateTimeUtil.formatLocalDateTime(header.getTimestamp()))
.append("\n");
return sb.toString();
}
}
private static class PictureDialog extends JDialog {
public PictureDialog(Frame owner, String title, boolean modal,
String path) {
super(owner, title, modal);
ImageIcon icon = new ImageIcon(path);
JLabel lbPhoto = new JLabel(icon);
this.add(lbPhoto);
this.setSize(icon.getIconWidth(), icon.getIconHeight());
this.setVisible(true);
}
}
public static void main(String[] args) {
System.out.println("Initialing...");
ChatClient client = new ChatClient("Client", 200, 200, 300, 200);
client.launch();
}
}
完整源码:https://github.com/songxinjianqwe/Chat.git
原文:https://www.cnblogs.com/kangyuxin/p/12002084.html
评论(0)