Java NIO
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。那么今天就用Java NIO写一个服务端。
线程池 ThreadPoolExecutor:
ThreadPoolExecutor是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
数据库连接池ComboPooledDataSource:
关于数据库连接池的使用,首先我们要明白我们为什么要用它,对应普通的数据库连接操作,通常会涉及到以下一些操作是比较耗时的: 网络通讯,涉及到网络延时及协议通讯 身份验证,涉及安全性检查 连接合法性检查,主要是检查所连接的数据库是否存在 并发控制机制 构造并初始化输出缓冲区 连接成功后的信息保存,日志存储 服务器性能 数据库配置优化 系统分配内存资源 等等~~~状况,导致数据库连接操作比较耗时,~~~而且每次都得花费0.05s~1s的时间 但是使用连接池技术,本质上就是在一个请求对应的连接,都由一个线程池来维护着,也就是说“上下文切换”的代价是线程级别(所谓的纳秒级),对于大规模的并发访问,就算以每秒几亿级别的访问量都是不成问题的。
上代码
SocketConnectThread 类负责处理客户端的连接,然后分发读取数据任务
SocketWorkThread负责读取数据再分到写入线程
SocketWriteThread负责把读取到数据和要返回客户端的数据处理
下面是开发者使用的,上面全部是封装的内部机制,相当于服务端的通讯内核不需要开发者改动。 LogicPro处理服务端读取到的数据和要写的数据(面对开发者)
SocketServer启动服务
SocketClient测试客户端,也可以用SocketChannel加Selector实现,也是非阻塞模式,看项目需求。实现和服务端基本上相似,可参考服务端实现。
源码下载
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。那么今天就用Java NIO写一个服务端。
线程池 ThreadPoolExecutor:
ThreadPoolExecutor是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
数据库连接池ComboPooledDataSource:
关于数据库连接池的使用,首先我们要明白我们为什么要用它,对应普通的数据库连接操作,通常会涉及到以下一些操作是比较耗时的: 网络通讯,涉及到网络延时及协议通讯 身份验证,涉及安全性检查 连接合法性检查,主要是检查所连接的数据库是否存在 并发控制机制 构造并初始化输出缓冲区 连接成功后的信息保存,日志存储 服务器性能 数据库配置优化 系统分配内存资源 等等~~~状况,导致数据库连接操作比较耗时,~~~而且每次都得花费0.05s~1s的时间 但是使用连接池技术,本质上就是在一个请求对应的连接,都由一个线程池来维护着,也就是说“上下文切换”的代价是线程级别(所谓的纳秒级),对于大规模的并发访问,就算以每秒几亿级别的访问量都是不成问题的。
上代码
SocketConnectThread 类负责处理客户端的连接,然后分发读取数据任务
package com.soft.nio.connector.socket; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import com.soft.nio.connector.core.NetTool; import com.soft.nio.connector.core.ThreadPool; /** * 监听连接线程 * * @author leehom * */ public class SocketConnectThread implements Runnable { /** selector专门用来监听client连接 **/ protected Selector selector; protected SocketWorkThread workThread; private ServerSocketChannel server; private static SocketConnectThread socketConnectThread; private boolean stop = false; public static final int BLOCK = 3;// 100KB数据,多了自动断开连接 private static Map<String, Object> black_map; private SocketConnectThread(int port) { super(); // TODO Auto-generated constructor stub // 初始大小10000 black_map = new HashMap<String, Object>(10000); try { // 打开一个选择器 selector = Selector.open(); // 打开服务器套接字通道 server = ServerSocketChannel.open(); // 启用/禁用 SO_REUSEADDR 套接字选项 server.socket().setReuseAddress(true); // 调整此通道的阻塞模式 server.configureBlocking(false); // 将 ServerSocket 绑定到特定地址 server.socket().bind(new InetSocketAddress(port)); // 向给定的选择器注册此通道,返回一个选择键 server.register(selector, SelectionKey.OP_ACCEPT); workThread = new SocketWorkThread(); ThreadPool.getInstance().execute(this); System.out.println("open socket server->port:" + port); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static SocketConnectThread newInstance(int port) { if (socketConnectThread == null) socketConnectThread = new SocketConnectThread(port); return socketConnectThread; } public static SocketConnectThread getInstance() { return socketConnectThread; } public static boolean inBlackList(String ip) { return black_map.containsKey(ip); } public static void addBlackList(String ip) { black_map.put(ip, null); } public static void resetBlackList() { // 重黑黑名单 System.out.println("重黑黑名单"); black_map.clear(); try { List<String> list = NetTool.readBlackList(); for (int i = 0; i < list.size(); i++) { String ip = list.get(i); addBlackList(ip); System.out.println("读取黑名单一条:" + ip); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { // TODO Auto-generated method stub try { processConnector(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void stop() { selector.wakeup(); stop = true; workThread.stop(); } /** 等待客户端连接 **/ private void processConnector() throws IOException { // 选择ACCEPT连接一组键 ServerSocketChannel server = null; SocketChannel client = null; while (!stop && selector.select() > 0) { // 返回此选择器的已选择键集 Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> keys = set.iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); // 置空迭代器 keys.remove(); // 测试此键的通道是否已准备好接受新的套接字连接 try { if (key.isAcceptable()) { // 返回为之创建此键的通道 server = (ServerSocketChannel) key.channel(); // 接受到此通道套接字的连接 client = server.accept(); String ip = client.socket().getInetAddress() .getHostAddress(); boolean isBlack = inBlackList(ip); System.out.println(ip + "是否黑名字:" + isBlack); if (isBlack) { server.close(); client.close(); key.cancel(); } else { // 调整此通道的阻塞模式 client.configureBlocking(false); // 向给定的选择器注册此通道,返回一个选择键 client.register(workThread.getSelector(), SelectionKey.OP_READ, new ByteArrayOutputStream()); } } } catch (Exception exception) { exception.printStackTrace(); if (key != null) { key.cancel(); key.channel().close(); } } } } } /** * ServerSocketChannel: ServerSocket 的替代类, 支持阻塞通信与非阻塞通信 SocketChannel:Socket * 的替代类, 支持阻塞通信与非阻塞通信 Selector: 为ServerSocketChannel 监控接收客户端连接就绪事件 * 为SocketChannel 监控连接服务器就绪, 读就绪和写就绪事件. SelectionKey: 代表 ServerSocketChannel * 及 SocketChannel 向 Selector 注册事件的句柄.当一个 SelectionKey 对象位于Selector 对象的 * selected-keys 集合中时, 就表示与这个SelectionKey 对象相关的事件发生了 * SelectionKey.OP_ACCEPT->客户端连接就绪事件 等于监听serversocket.accept() 返回一个socket * SelectionKey.OP_CONNECT ->准备连接服务器就绪跟上面类似,只不过是对于 socket的 相当于监听了 * socket.connect() SelectionKey.OP_READ->读就绪事件, 表示输入流中已经有了可读数据, 可以执行读操作了 * SelectionKey.OP_WRITE->写就绪事件 */ }
SocketWorkThread负责读取数据再分到写入线程
package com.soft.nio.connector.socket; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import com.soft.nio.connector.core.ThreadPool; public class SocketWorkThread implements Runnable { /** selector专门用来监听client是否有数据传过来,可读 **/ protected Selector selector; protected boolean stop; protected SocketWorkThread() { try { selector = Selector.open(); ThreadPool.getInstance().execute(this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void stop() { this.stop = true; } @Override public void run() { // TODO Auto-generated method stub // selector进入read,write非阻塞状态 不能用while(select()>0)循环 // 没有数据会跳出循环 try { while (!stop) { listener(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("读取关闭异常"); } } /** 监听是否有数据可读取 **/ protected void listener() throws IOException { SelectionKey selectionKey = null; try { if (selector.select(10L) > 0) { Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()) { selectionKey = (SelectionKey) iterator.next(); iterator.remove();// 用一个清除一个 readBufer(selectionKey); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); if (selectionKey != null) { SocketChannel client = (SocketChannel) selectionKey.channel(); String ip = client.socket().getInetAddress().getHostAddress(); System.out.println(ip + "离线了"); client.socket().close(); client.close(); selectionKey.cancel(); } } } /** 读取数据 **/ protected void readBufer(SelectionKey selectionKey) throws IOException { // /////监听连接线程丢到读取线程/////////// SocketChannel client = (SocketChannel) selectionKey.channel(); ByteArrayOutputStream bos = (ByteArrayOutputStream) selectionKey .attachment(); ByteBuffer buffer = ByteBuffer.allocate(10240);// 10kb缓存 int actual = 0; while ((actual = client.read(buffer)) > 0) { buffer.flip(); int limit = buffer.limit(); byte b[] = new byte[limit]; buffer.get(b); bos.write(b); buffer.clear();// 清空 } if (actual < 0) { // 出现异常 String ip = client.socket().getInetAddress().getHostAddress(); System.out.println(ip + "数据读取异常,连接断开"); client.socket().close(); client.close(); selectionKey.cancel(); return; } ThreadPool.getInstance().execute(new SocketWriteThread(selectionKey)); } public final Selector getSelector() { return selector; } }
SocketWriteThread负责把读取到数据和要返回客户端的数据处理
package com.soft.nio.connector.socket; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import com.soft.nio.connector.server.LogicPro; public class SocketWriteThread implements Runnable { private SelectionKey sk; public SocketWriteThread(SelectionKey s) { sk = s; } public void run() { // TODO Auto-generated method stub try { writeBufer(sk); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("写入关闭异常"); } } /** 处理读取到数据和写入返回客户庙数据 **/ protected void writeBufer(SelectionKey selectionKey) throws IOException { // /////写入数据//////// SocketChannel client = null; try { client = (SocketChannel) selectionKey.channel(); ByteArrayOutputStream stream = (ByteArrayOutputStream) selectionKey .attachment(); stream.flush(); // 预写入数据 ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream( arrayOutputStream); // 预读取数据 ByteArrayInputStream arrayInputStream = new ByteArrayInputStream( stream.toByteArray()); DataInputStream inputStream = new DataInputStream(arrayInputStream); String name = LogicPro.class.getName(); LogicPro logic = (LogicPro) Class.forName(name).newInstance(); // 处理客户端读取到数据和服务端要发送的数据 logic.processClientData(inputStream, outputStream, client.socket() .getInetAddress()); inputStream.close(); arrayInputStream.close(); outputStream.flush(); arrayOutputStream.flush(); byte[] data = arrayOutputStream.toByteArray(); outputStream.close(); arrayOutputStream.close(); ByteBuffer writeBuffer = ByteBuffer.wrap(data); while (writeBuffer.hasRemaining()) client.write(writeBuffer); stream.reset();// 清空缓冲区,准备下次缓冲 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); if (client != null) { String ip = client.socket().getInetAddress().getHostAddress(); System.out.println(ip + "数据写入异常,连接断开"); client.socket().close(); client.close(); } selectionKey.cancel(); } } }
下面是开发者使用的,上面全部是封装的内部机制,相当于服务端的通讯内核不需要开发者改动。 LogicPro处理服务端读取到的数据和要写的数据(面对开发者)
package com.soft.nio.connector.server; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.text.SimpleDateFormat; import java.util.Date; import com.soft.nio.connector.core.Logic; /** * 所有的数据交互均在此类中完成 此类由服务端单独分发线程完成 ProcessData 函数不支持异步操作数据流 * * @author leehom */ public class LogicPro implements Logic { public SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void processClientData(DataInputStream dis, DataOutputStream dos, InetAddress add) throws IOException { // TODO Auto-generated method stub int size = dis.available(); byte[] data = new byte[size]; dis.read(data); System.out.println(new String(data)); String sendStr = sdf.format(new Date()); byte[] tmp = sendStr.getBytes(); byte[] sendData = new byte[data.length + tmp.length]; System.arraycopy(tmp, 0, sendData, 0, tmp.length); System.arraycopy(data, 0, sendData, tmp.length, data.length); dos.write(sendData); } }
SocketServer启动服务
package com.soft.nio.connector.server; import com.soft.nio.connector.socket.SocketConnectThread; public class SocketServer { public static void main(String[] args) { SocketConnectThread.newInstance(8888); } }
SocketClient测试客户端,也可以用SocketChannel加Selector实现,也是非阻塞模式,看项目需求。实现和服务端基本上相似,可参考服务端实现。
package com.soft.nio.connector.server; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.Scanner; public class SocketClient implements Runnable { Socket socket; InputStream is; OutputStream os; public SocketClient() { // TODO Auto-generated constructor stub new Thread(this).start(); scanData(); } private void scanData() { try { Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String cmd = scanner.nextLine(); if (cmd.equals("exit")) { scanner.close(); os.close(); is.close(); socket.close(); break; } else if (os != null) { byte[] data = cmd.getBytes(); os.write(data); os.flush(); System.out.println("send:" + cmd); } } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { // TODO Auto-generated method stub try { socket = new Socket("127.0.0.1", 8888); os = socket.getOutputStream(); is = socket.getInputStream(); while (true) { byte[] buf = new byte[512]; int len = is.read(buf); if (len == -1) { System.out.println("断开连接了"); break; } System.out.println("recv:" + new String(buf, 0, len)); } } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub new SocketClient(); } }
源码下载
收藏的用户(0) X
正在加载信息~
推荐阅读
最新回复 (0)
站点信息
- 文章2302
- 用户1336
- 访客10962994
每日一句
Progress starts with one brave step forward.
进步始于一次勇敢的迈步。
进步始于一次勇敢的迈步。
新会员