Java NIO
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。那么今天就用Java NIO写一个服务端。
线程池 ThreadPoolExecutor:
ThreadPoolExecutor是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
数据库连接池ComboPooledDataSource:
关于数据库连接池的使用,首先我们要明白我们为什么要用它,对应普通的数据库连接操作,通常会涉及到以下一些操作是比较耗时的: 网络通讯,涉及到网络延时及协议通讯 身份验证,涉及安全性检查 连接合法性检查,主要是检查所连接的数据库是否存在 并发控制机制 构造并初始化输出缓冲区 连接成功后的信息保存,日志存储 服务器性能 数据库配置优化 系统分配内存资源 等等~~~状况,导致数据库连接操作比较耗时,~~~而且每次都得花费0.05s~1s的时间 但是使用连接池技术,本质上就是在一个请求对应的连接,都由一个线程池来维护着,也就是说“上下文切换”的代价是线程级别(所谓的纳秒级),对于大规模的并发访问,就算以每秒几亿级别的访问量都是不成问题的。
上代码
SocketConnectThread 类负责处理客户端的连接,然后分发读取数据任务
SocketWorkThread负责读取数据再分到写入线程
SocketWriteThread负责把读取到数据和要返回客户端的数据处理
下面是开发者使用的,上面全部是封装的内部机制,相当于服务端的通讯内核不需要开发者改动。 LogicPro处理服务端读取到的数据和要写的数据(面对开发者)
SocketServer启动服务
SocketClient测试客户端,也可以用SocketChannel加Selector实现,也是非阻塞模式,看项目需求。实现和服务端基本上相似,可参考服务端实现。
源码下载
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。那么今天就用Java NIO写一个服务端。
线程池 ThreadPoolExecutor:
ThreadPoolExecutor是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
数据库连接池ComboPooledDataSource:
关于数据库连接池的使用,首先我们要明白我们为什么要用它,对应普通的数据库连接操作,通常会涉及到以下一些操作是比较耗时的: 网络通讯,涉及到网络延时及协议通讯 身份验证,涉及安全性检查 连接合法性检查,主要是检查所连接的数据库是否存在 并发控制机制 构造并初始化输出缓冲区 连接成功后的信息保存,日志存储 服务器性能 数据库配置优化 系统分配内存资源 等等~~~状况,导致数据库连接操作比较耗时,~~~而且每次都得花费0.05s~1s的时间 但是使用连接池技术,本质上就是在一个请求对应的连接,都由一个线程池来维护着,也就是说“上下文切换”的代价是线程级别(所谓的纳秒级),对于大规模的并发访问,就算以每秒几亿级别的访问量都是不成问题的。
上代码
SocketConnectThread 类负责处理客户端的连接,然后分发读取数据任务
package com.soft.nio.connector.socket;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.soft.nio.connector.core.NetTool;
import com.soft.nio.connector.core.ThreadPool;
/**
* 监听连接线程
*
* @author leehom
*
*/
public class SocketConnectThread implements Runnable {
/** selector专门用来监听client连接 **/
protected Selector selector;
protected SocketWorkThread workThread;
private ServerSocketChannel server;
private static SocketConnectThread socketConnectThread;
private boolean stop = false;
public static final int BLOCK = 3;// 100KB数据,多了自动断开连接
private static Map<String, Object> black_map;
private SocketConnectThread(int port) {
super();
// TODO Auto-generated constructor stub
// 初始大小10000
black_map = new HashMap<String, Object>(10000);
try {
// 打开一个选择器
selector = Selector.open();
// 打开服务器套接字通道
server = ServerSocketChannel.open();
// 启用/禁用 SO_REUSEADDR 套接字选项
server.socket().setReuseAddress(true);
// 调整此通道的阻塞模式
server.configureBlocking(false);
// 将 ServerSocket 绑定到特定地址
server.socket().bind(new InetSocketAddress(port));
// 向给定的选择器注册此通道,返回一个选择键
server.register(selector, SelectionKey.OP_ACCEPT);
workThread = new SocketWorkThread();
ThreadPool.getInstance().execute(this);
System.out.println("open socket server->port:" + port);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static SocketConnectThread newInstance(int port) {
if (socketConnectThread == null)
socketConnectThread = new SocketConnectThread(port);
return socketConnectThread;
}
public static SocketConnectThread getInstance() {
return socketConnectThread;
}
public static boolean inBlackList(String ip) {
return black_map.containsKey(ip);
}
public static void addBlackList(String ip) {
black_map.put(ip, null);
}
public static void resetBlackList() {
// 重黑黑名单
System.out.println("重黑黑名单");
black_map.clear();
try {
List<String> list = NetTool.readBlackList();
for (int i = 0; i < list.size(); i++) {
String ip = list.get(i);
addBlackList(ip);
System.out.println("读取黑名单一条:" + ip);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
processConnector();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void stop() {
selector.wakeup();
stop = true;
workThread.stop();
}
/** 等待客户端连接 **/
private void processConnector() throws IOException {
// 选择ACCEPT连接一组键
ServerSocketChannel server = null;
SocketChannel client = null;
while (!stop && selector.select() > 0) {
// 返回此选择器的已选择键集
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> keys = set.iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
// 置空迭代器
keys.remove();
// 测试此键的通道是否已准备好接受新的套接字连接
try {
if (key.isAcceptable()) {
// 返回为之创建此键的通道
server = (ServerSocketChannel) key.channel();
// 接受到此通道套接字的连接
client = server.accept();
String ip = client.socket().getInetAddress()
.getHostAddress();
boolean isBlack = inBlackList(ip);
System.out.println(ip + "是否黑名字:" + isBlack);
if (isBlack) {
server.close();
client.close();
key.cancel();
} else {
// 调整此通道的阻塞模式
client.configureBlocking(false);
// 向给定的选择器注册此通道,返回一个选择键
client.register(workThread.getSelector(),
SelectionKey.OP_READ,
new ByteArrayOutputStream());
}
}
} catch (Exception exception) {
exception.printStackTrace();
if (key != null) {
key.cancel();
key.channel().close();
}
}
}
}
}
/**
* ServerSocketChannel: ServerSocket 的替代类, 支持阻塞通信与非阻塞通信 SocketChannel:Socket
* 的替代类, 支持阻塞通信与非阻塞通信 Selector: 为ServerSocketChannel 监控接收客户端连接就绪事件
* 为SocketChannel 监控连接服务器就绪, 读就绪和写就绪事件. SelectionKey: 代表 ServerSocketChannel
* 及 SocketChannel 向 Selector 注册事件的句柄.当一个 SelectionKey 对象位于Selector 对象的
* selected-keys 集合中时, 就表示与这个SelectionKey 对象相关的事件发生了
* SelectionKey.OP_ACCEPT->客户端连接就绪事件 等于监听serversocket.accept() 返回一个socket
* SelectionKey.OP_CONNECT ->准备连接服务器就绪跟上面类似,只不过是对于 socket的 相当于监听了
* socket.connect() SelectionKey.OP_READ->读就绪事件, 表示输入流中已经有了可读数据, 可以执行读操作了
* SelectionKey.OP_WRITE->写就绪事件
*/
}
SocketWorkThread负责读取数据再分到写入线程
package com.soft.nio.connector.socket;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import com.soft.nio.connector.core.ThreadPool;
public class SocketWorkThread implements Runnable {
/** selector专门用来监听client是否有数据传过来,可读 **/
protected Selector selector;
protected boolean stop;
protected SocketWorkThread() {
try {
selector = Selector.open();
ThreadPool.getInstance().execute(this);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
// TODO Auto-generated method stub
// selector进入read,write非阻塞状态 不能用while(select()>0)循环
// 没有数据会跳出循环
try {
while (!stop) {
listener();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("读取关闭异常");
}
}
/** 监听是否有数据可读取 **/
protected void listener() throws IOException {
SelectionKey selectionKey = null;
try {
if (selector.select(10L) > 0) {
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
selectionKey = (SelectionKey) iterator.next();
iterator.remove();// 用一个清除一个
readBufer(selectionKey);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (selectionKey != null) {
SocketChannel client = (SocketChannel) selectionKey.channel();
String ip = client.socket().getInetAddress().getHostAddress();
System.out.println(ip + "离线了");
client.socket().close();
client.close();
selectionKey.cancel();
}
}
}
/** 读取数据 **/
protected void readBufer(SelectionKey selectionKey) throws IOException {
// /////监听连接线程丢到读取线程///////////
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteArrayOutputStream bos = (ByteArrayOutputStream) selectionKey
.attachment();
ByteBuffer buffer = ByteBuffer.allocate(10240);// 10kb缓存
int actual = 0;
while ((actual = client.read(buffer)) > 0) {
buffer.flip();
int limit = buffer.limit();
byte b[] = new byte[limit];
buffer.get(b);
bos.write(b);
buffer.clear();// 清空
}
if (actual < 0) {
// 出现异常
String ip = client.socket().getInetAddress().getHostAddress();
System.out.println(ip + "数据读取异常,连接断开");
client.socket().close();
client.close();
selectionKey.cancel();
return;
}
ThreadPool.getInstance().execute(new SocketWriteThread(selectionKey));
}
public final Selector getSelector() {
return selector;
}
}
SocketWriteThread负责把读取到数据和要返回客户端的数据处理
package com.soft.nio.connector.socket;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import com.soft.nio.connector.server.LogicPro;
public class SocketWriteThread implements Runnable {
private SelectionKey sk;
public SocketWriteThread(SelectionKey s) {
sk = s;
}
public void run() {
// TODO Auto-generated method stub
try {
writeBufer(sk);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("写入关闭异常");
}
}
/** 处理读取到数据和写入返回客户庙数据 **/
protected void writeBufer(SelectionKey selectionKey) throws IOException {
// /////写入数据////////
SocketChannel client = null;
try {
client = (SocketChannel) selectionKey.channel();
ByteArrayOutputStream stream = (ByteArrayOutputStream) selectionKey
.attachment();
stream.flush();
// 预写入数据
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(
arrayOutputStream);
// 预读取数据
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(
stream.toByteArray());
DataInputStream inputStream = new DataInputStream(arrayInputStream);
String name = LogicPro.class.getName();
LogicPro logic = (LogicPro) Class.forName(name).newInstance();
// 处理客户端读取到数据和服务端要发送的数据
logic.processClientData(inputStream, outputStream, client.socket()
.getInetAddress());
inputStream.close();
arrayInputStream.close();
outputStream.flush();
arrayOutputStream.flush();
byte[] data = arrayOutputStream.toByteArray();
outputStream.close();
arrayOutputStream.close();
ByteBuffer writeBuffer = ByteBuffer.wrap(data);
while (writeBuffer.hasRemaining())
client.write(writeBuffer);
stream.reset();// 清空缓冲区,准备下次缓冲
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (client != null) {
String ip = client.socket().getInetAddress().getHostAddress();
System.out.println(ip + "数据写入异常,连接断开");
client.socket().close();
client.close();
}
selectionKey.cancel();
}
}
}
下面是开发者使用的,上面全部是封装的内部机制,相当于服务端的通讯内核不需要开发者改动。 LogicPro处理服务端读取到的数据和要写的数据(面对开发者)
package com.soft.nio.connector.server;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.soft.nio.connector.core.Logic;
/**
* 所有的数据交互均在此类中完成 此类由服务端单独分发线程完成 ProcessData 函数不支持异步操作数据流
*
* @author leehom
*/
public class LogicPro implements Logic {
public SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void processClientData(DataInputStream dis, DataOutputStream dos,
InetAddress add) throws IOException {
// TODO Auto-generated method stub
int size = dis.available();
byte[] data = new byte[size];
dis.read(data);
System.out.println(new String(data));
String sendStr = sdf.format(new Date());
byte[] tmp = sendStr.getBytes();
byte[] sendData = new byte[data.length + tmp.length];
System.arraycopy(tmp, 0, sendData, 0, tmp.length);
System.arraycopy(data, 0, sendData, tmp.length, data.length);
dos.write(sendData);
}
}
SocketServer启动服务
package com.soft.nio.connector.server;
import com.soft.nio.connector.socket.SocketConnectThread;
public class SocketServer {
public static void main(String[] args) {
SocketConnectThread.newInstance(8888);
}
}
SocketClient测试客户端,也可以用SocketChannel加Selector实现,也是非阻塞模式,看项目需求。实现和服务端基本上相似,可参考服务端实现。
package com.soft.nio.connector.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class SocketClient implements Runnable {
Socket socket;
InputStream is;
OutputStream os;
public SocketClient() {
// TODO Auto-generated constructor stub
new Thread(this).start();
scanData();
}
private void scanData() {
try {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String cmd = scanner.nextLine();
if (cmd.equals("exit")) {
scanner.close();
os.close();
is.close();
socket.close();
break;
} else if (os != null) {
byte[] data = cmd.getBytes();
os.write(data);
os.flush();
System.out.println("send:" + cmd);
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
socket = new Socket("127.0.0.1", 8888);
os = socket.getOutputStream();
is = socket.getInputStream();
while (true) {
byte[] buf = new byte[512];
int len = is.read(buf);
if (len == -1) {
System.out.println("断开连接了");
break;
}
System.out.println("recv:" + new String(buf, 0, len));
}
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
new SocketClient();
}
}
源码下载
收藏的用户(0) X
正在加载信息~
推荐阅读
最新回复 (0)
站点信息
- 文章2314
- 用户1336
- 访客11812443
每日一句
Let's seek joy in the simple, quiet moments.
让我们在简单宁静的时刻中寻找快乐。
让我们在简单宁静的时刻中寻找快乐。