Socket服务器整体架构概述

Socket服务器主要用于提供高效、稳定的数据处理、消息转发等服务,它直接决定了前台应用程序的性能。我们先从整体上认识一下Socket服务器,Socket服务器从架构上一般分为:网络层、业务逻辑层、会话层、数据访问层,如图:

(一) 网络层
网络层主要用于侦听socket连接、创建socket、接受消息、发送消息、关闭连接。作为socket通信服务器,网络层的性能相当重要,所以我们在设计网络层时,要着重在以下几方面获得突破:最大连接数、最大并发数、秒处理消息数。如何突破呢?下面我为大家介绍几种网络层常用到的一些技术和技巧(具体实现,我将在博文中逐一具体阐述):

1)Buffer管理
每一个SocketAsyncEventArgs对象(以下简称SAEA)在内存中都有其对应的缓存空间,如果不对这些缓存空间进行同一管理,当SAEA对象逐渐增多时,这些SAEA对象的缓存空间会越来越大,它们在系统内存中不是连续的,造成很多内存碎片,而且这些缓存不能重复利用,当创建、销毁SAEA对象时,造成CPU很多额外消耗,影响服务器性能。面对这问题如何解决呢?用Buffer池管理!

2)双工通信
Socket服务器提高通信效率是一个永恒的话题,提高通信效率有很多种方法,双工通信就是其中之一。一个SAEA对象在同一时刻只能用来接收数据或发送数据,有人想,如果一个SAEA对象在同一时刻既能发送数据又能接受数据,那肯定会提高socket通信效率。恩,很有想法!可是你能让你的头在同一时刻既往左转又往右转吗?答案是不行的,那如何实现双工通信呢?既然一个SAEA对象在同一时刻只能做一件事,那我自定义DuplexSAEA对象,在该对象中封装两个SAEA,一个用于接受,一个用于发送,问题不就解决了吗。

3)poolOfAcceptEventArgs
poolOfAcceptEventArgs是个什么东西?它不是个东西,是一个容器,一个容纳AcceptSAEA对象的容器。给你两个socket服务器,你能很快判别两个服务器性能的优异吗?很简单,你瞬间向一台服务器打入5、6万的连接,看看会不会都连上,如果都连上,说明这台socket服务器的并发处理连接的能力还是不错的。那如何提高socket服务器的并发连接能力呢?答案:poolOfAcceptEventArgs!

4)消息队列调度器
消息队列调度器主要分为两种:接受消息队列、发送消息队列。为什么要用消息队列呢?主要是提高socket服务器的吞吐量。首先我们定义一个队列Queue,然后编写N个调度器,不断从队列中调度消息,接受队列调度器用于将消息抛至业务逻辑层处理,发送队列调度器用于调用网络层发送消息接口,向指定端口发送数据。

5)心跳扫描
有一个困惑:客户端连接socket服务器,连接没有断开,但客户端挂了,这样这条连接在socket服务器中就成了钉子户,落地生根不走了!一个钉子户还可以忍受,千千万万个呢?那就崩溃了!怎样解决这个问题呢?定时扫描每条连接,如果该条连接在超时时间内没有IO响应,则关闭它。

6)粘包
服务器在接受消息包时,如果两个数据包同时被你服务器收了怎么办?你会把他当成一个数据包吗?如果一个数据包断了,分成两次被你服务器收了,你会把他们拼接起来吗?这些就是粘包了,怎么解决?正则表达式扫描!

7)多线程编程
Socket服务器的编程就是多线程编程,面对多线程,线程间怎样同步、怎样避免死锁?多线程访问公共资源如何处理,在下面的博文中,我将会为大家具体阐述。

(二) 业务逻辑层
  网络层将解包后的消息包抛至业务逻辑层,业务逻辑层收到消息包后,解析消息类型,然后转入相应的处理流程处理。
  网络层应提供发送消息的接口供业务逻辑层调用,因为网络层不会主动发送消息,发送消息的操作是由业务逻辑层来控制的,所以业务逻辑层应根据具体的业务应用,封装不同功能的发送消息的方法。

(三) 会话层
  会话层主要用于记录在线用户信息,该层隶属于业务逻辑层。既然隶属于业务逻辑层,那为什么还要独立出来呢?这主要为以后分布式开发拓展用,试想,一台服务器最大能支持多少人同时在线?中国有多少人?如果1亿人同时在线,你一台服务器能支持得了吗?答案肯定是否定的,所以要分布式开发。分布式开发涉及到用户信息同步的问题,所以会话层就要独立出来了。

(四) 数据访问层
  数据库执行效率是整个socket服务器的瓶颈?为什么呢?举个例子:假设我们的socket服务器的秒处理消息的条数为3000,每处理一条消息都会保存历史记录,那么,如果数据访问层不想拖网络层的后腿,那么他的执行sql语句的效率也必须达到每秒3000!如果socket服务器和数据库服务器部署在同一网段上,这个速度是没有问题的,但如果数据库服务器部署在外网呢?你的sql语句的执行效率能达到那么高吗?很困难!
  再思考一个问题:如果网络层执行线程和数据库执行线程是同一个线程,那么网络层的处理必须等待数据库执行完毕后,才能进行!如果数据库执行效率比较慢,那对整个socket服务器将是一个毁灭性的打击。
  那么怎样将数据访问层与网络层分离,让他们互不影响?如何提高数据库执行效率,让网络层的处理速度和数据访问层的处理速度达到一个平衡?答案:连接池+sql调度器+主从数据库。

Socket服务器的整体架构就为大家介绍到这里,下面我将会为大家具体阐述各个技术的实现。
原文:
http://www.cnblogs.com/tianzhiliang/archive/2010/10/28/1863684.html

Java NIO Socket实现C/S架构

一般的Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package thread.socket;  
import java.io.*;
import java.net.*;
import java.util.*;
public class Server extends ServerSocket {
private static final int SERVER_PORT = 10000;
private List<WorkerThread> workers;

public Server() throws IOException {
super(SERVER_PORT);
workers = new LinkedList<WorkerThread>();

try {
System.out.println("server is listening...");
while (true) {
Socket socket = accept();
workers.add(new WorkerThread(socket));
System.out.println(String.format("new worker created, total %d", getWorkerCount()));
}
} catch (IOException e) {
} finally {
close();
}
}
public synchronized int getWorkerCount() {
return workers.size();
}
public static void main(String[] args) throws IOException {
new Server();
}
}
// --- WorkerThread
class WorkerThread extends Thread {
private Socket client;
private BufferedReader in;
private PrintWriter out;
public WorkerThread(Socket s) throws IOException {
System.out.println(String.format("create a new thread. %s", s));
client = s;
in = new BufferedReader(new InputStreamReader(client
.getInputStream(), "GB2312"));
out = new PrintWriter(client.getOutputStream(), true);
out.println("--- Welcome ---" + client.getRemoteSocketAddress());
start();
}
public void run() {
try {
String line = in.readLine();
while (!line.equals("bye")) {
System.out.println("client " + client.getRemoteSocketAddress() + " says: " + line);
String msg = createMessage(line);
out.println(msg);
line = in.readLine();
}
out.println("bye");
System.out.println("client " + client.getRemoteSocketAddress() + " quit");
client.close();
} catch (IOException e) {
}
}
private String createMessage(String line) {
// ;
return "response to " + line;
}
}

使用NIO的Server端 (从1.5开始,Java对InputStream/OutputStream 进行了重新改写,用的就是NIO,因此,就算你不显示声明要用NIO,只要你的类继承了InputStream/OutputStream就已经在用NIO了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import java.io.BufferedWriter;  
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
public class SelectorServer
{
private static int DEFAULT_SERVERPORT = 6018;//默认端口
private static int DEFAULT_BUFFERSIZE = 1024;//默认缓冲区大小为1024字节
private static String DEFAULT_CHARSET = "GB2312";//默认码集
private static String DEFAULT_FILENAME = "bigfile.dat";
private ServerSocketChannel channel;
private LinkedList<SocketChannel> clients;
private Selector selector;//选择器
private ByteBuffer buffer;//字节缓冲区
private int port;
private Charset charset;//字符集
private CharsetDecoder decoder;//解码器


public SelectorServer(int port) throws IOException
{
this.port = port;
this.clients = new LinkedList<SocketChannel>();
this.channel = null;
this.selector = Selector.open();//打开选择器
this.buffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
this.charset = Charset.forName(DEFAULT_CHARSET);
this.decoder = this.charset.newDecoder();

}

private class HandleClient
{
private String strGreeting = "welcome to VistaQQ";
public HandleClient() throws IOException
{
}
public String readBlock()
{//读块数据
return this.strGreeting;
}
public void close()
{

}
}
protected void handleKey(SelectionKey key) throws IOException
{//处理事件
if (key.isAcceptable())
{ // 接收请求
ServerSocketChannel server = (ServerSocketChannel) key.channel();//取出对应的服务器通道
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);//客户socket通道注册读操作
}
else if (key.isReadable())
{ // 读信息
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(this.buffer);
if (count > 0)
{
this.buffer.flip();
CharBuffer charBuffer = decoder.decode(this.buffer);
System.out.println("Client >>" + charBuffer.toString());
SelectionKey wKey = channel.register(selector,
SelectionKey.OP_WRITE);//为客户sockt通道注册写操作
wKey.attach(new HandleClient());
}
else
{//客户已经断开
channel.close();
}
this.buffer.clear();//清空缓冲区
}
else if (key.isWritable())
{ // 写事件
SocketChannel channel = (SocketChannel) key.channel();
HandleClient handle = (HandleClient) key.attachment();//取出处理者
ByteBuffer block = ByteBuffer.wrap(handle.readBlock().getBytes());
channel.write(block);
// channel.socket().getInputStream().(block);
// PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
// channel.socket().getOutputStream())), true);
// out.write(block.toString());
}
}
public void listen() throws IOException
{ //服务器开始监听端口,提供服务
ServerSocket socket;
channel = ServerSocketChannel.open(); // 打开通道
socket = channel.socket(); //得到与通到相关的socket对象
socket.bind(new InetSocketAddress(port)); //将scoket榜定在制定的端口上
//配置通到使用非阻塞模式,在非阻塞模式下,可以编写多道程序同时避免使用复杂的多线程
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
try
{
while(true)
{// 与通常的程序不同,这里使用channel.accpet()接受客户端连接请求,而不是在socket对象上调用accept(),这里在调用accept()方法时如果通道配置为非阻塞模式,那么accept()方法立即返回null,并不阻塞
this.selector.select();
Iterator iter = this.selector.selectedKeys().iterator();
while(iter.hasNext())
{
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
this.handleKey(key);

}
}
}
catch(IOException ex)
{
ex.printStackTrace();
}
}
public static void main(String[] args) throws IOException
{
System.out.println("服务器启动");
SelectorServer server = new SelectorServer(SelectorServer.DEFAULT_SERVERPORT);
server.listen(); //服务器开始监听端口,提供服务
}
}

使用NIO的Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package thread.socket;  
import java.io.*;
import java.net.*;
public class Client {
Socket socket;
BufferedReader in;
PrintWriter out;
public Client() {
try {
socket = new Socket("127.0.0.1", 10000);
in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader line = new BufferedReader(new InputStreamReader(
System.in));
String cmd = "";
while (!cmd.equals("bye")) {
System.out.println("server says: " + in.readLine());
out.println(cmd = line.readLine());
}
System.out.println("socket " + socket + " stop");
line.close();
out.close();
in.close();
socket.close();
} catch (IOException e) {
}
}
public static void main(String[] args) {
new Client();
}
}