netty服务器,如何构建一个基于netty的后端服务器
今天给各位分享netty服务器的知识,其中也会对如何构建一个基于netty的后端服务器进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
netty 一个server有可以建多少个连接
1.应用在netty建连接的过程中做了耗时的事;
因此我先dump了应用的线程,看到一切正常,boss线程看起来非常空闲;
2. backlog太小;
首先问了下开发代码里有没有设置过backlog,开发告诉我没设置过,于是我翻了下netty的源码确认下默认值,看到backlog的默认值为读取自系统的/proc/sys/net/core/somaxconn,于是查了下系统的这个值,确认系统的这个值已经是调整过的,设置为了2048,然后确认了系统上的tcp_max_syn_backlog是4096,也就是最后work的会是2048(为什么是这样具体可见这篇文章),也就是说应该是够的。
用ss-s观察连接的状况,看到的是synrecv是0,也印证了上面的不是backlog的问题。
到这一步就彻底傻眼了,不知道该用什么方法排查了,于是开始一堆的google,看到的各种说解决新建连接并发低的解决办法,除了调整backlog外,主要是以下两种:
1关闭tcp_tw_recycle,尝试了(话说这里充分体现了”病急乱投医“的心态,其实我自己都不相信改这参数有用,但想着只是改下参数这种小代价的事,还是试试吧),没任何作用;
2关闭window scaling,也尝试了,一样没任何作用;
查到这个阶段觉得自己已经无法理解了,于是求助了厂内内核团队对网络这块比较精通的同学,然后又求助了“神”,“神“帮忙看了会后,说主要的问题是现在是每epollWait唤醒一次,只建了一个连接,这导致在大量新建连接请求并发的时候,效率不够高,因此我翻了下代码,发现我本机上看到的代码不是这样的,我本机上看到的netty代码在epollWait唤醒后,是会尝试一直去accept的,但这个应用使用的netty确实不是这样,于是查了下应用的jar包库,发现里面有两个版本的netty(一个是3.2.1.Final,一个是3.6.3.Final),3.2.1确实是每次epollWait后就处理一个,于是通知开发同学把3.2.1去掉,满心期待的认为应该是好了,等开发更新好了后,自己也确认了一次epollWait唤醒后会连续处理很多个建立连接的请求,但悲催的还是没解决问题,具体的netty在这块的改造感兴趣的同学可以看看NioServerSocketPipelineSink这个类(重点看select唤醒后)…
到这步,就彻底郁闷了,话说其实到这步的时候我已经被这个问题困扰了2天多了,于是只好继续google,发现又有提到打开syn cookies的建议,于是尝试了下,竟然真的work了,打开了这个参数后新建连接的并发请求轻松超过100+了。
到此以为已经解决了,但很快开发给我反馈,整个集群开启了这个参数后,连接确实是能建上了,但客户端出现了发了请求后,等不到任何响应的现象,当时还不确定服务器端到底有没有收到请求,于是只好又先关闭了这个参数(话说这个我到现在都不明白为什么这个参数打开后,会出现发请求没响应的现象,求高人解答)。
尽管还是没解决,但毕竟有进展,有的进展就是打开了syn cookies后连接就能建上,而syn cookies只有在backlog满了后才会生效,那也就是说还是backlog满了,从kern的日志也能确认syn cookies确实是work了,到这步就觉得诡异了,明明netty用的默认值就是somaxconn,而每秒新建40多个连接,且boss线程还很空闲的情况下显然不应该出现backlog满的现象,这个时候仔细看了下本机查看的netty代码,才发现我看的是netty 4的代码,而应用用的是netty 3.6.3,悲催,赶紧把netty 3.6.3的代码捞下来看了,才发现在3.6.3里backlog的默认值处理时不一样的,在3.6.3里默认值是50,不是netty写的默认值,是java本身,50那估计真的不一定够,于是就通知开发在代码里先强制设置下backlog为1000。
在开发改代码的过程中,还有一个怀疑点想确认,就是既然是backlog满了,为什么看到的synrecv会是0呢,于是再用netstat-na| grep [port]| grep SYN_RECV-c统计了下,结果发现值基本一直是64,java层面默认设置的是50,linux会将这个值调整为大于这个值的2的n次幂的值,那也就是64,好吧,看到这就彻底可以确定真的是因为backlog太小了造成的(只是话说我不明白为什么ss-s统计出来的synrecv会是0呢,求高人解答)。
等开发改完代码重新发布后,稍微增加了点引流测试了下,轻松支撑每秒200+,客户端建立连接后发请求获取响应也完全ok,问题到此宣告解决。
题外话:这应用之所以会比较容易出现较多的synrecv,主要是因为手机网络通常是不太稳定的,另外一个原因是这种对外的都很容易带来攻击,而当时刚好这个应用前面的一个用来防syn flood的由于有bug临时关闭了,所以问题暴露的比较明显。
从这个折腾了4天的case的排查过程,大家可以看到其实如果一开始我仔细确认过应用用的netty版本和我本机看的代码是不一致的话,估计很快就会排查出原因就是backlog值太小造成的,所以说折腾了这么多天其实也是自己造成的,这个告诉自己,以后排查问题的时候一定要对出现问题的应用所在的环境更加清楚的确认。
ps:最后再多啰嗦几句,从这个case还能看到的是netty的版本其实在细节上是一直在改进的,就像这个case里的不同版本的netty在处理连接事件唤醒上,还有backlog的默认值上,所以我一直很强调,对于需要存活很多年的软件而言,选择一个使用范围较广的开源软件是非常重要的,如果自己开发,也许短期能超越,但放到三年、五年这样的范围来看,通常是很难和开源软件去抗衡的(原因是商业公司没多少人会专注在一个领域做三五年的,而开源界这样的人实在是多,说实话,这种case看过太多),所以如果觉得你能做的比开源软件好,还不如去帮助已有的(当然,如果这个领域目前完全没有什么使用面较广的、靠谱的,那自己做一个开源是挺好的),软件的可持续发展能力(除非是一次性软件、做的玩的或就玩个一两年的)是非常非常重要的。
netty 实现的服务器 怎么和c++客户端进行通讯
C++通过socket编程实现服务端与客户端的通讯,代码如下(个人环境下测试正常,如果遇到运行send发送报错,请检查服务器端口是否被占用,调试的时候请先运行服务端程序在运行客服端,一定要加载库函数ws2_32.lib,发送字符时应该多加一个空字符作为结束字符):
服务器端程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Server.cpp: Defines the entry point for the console application.
#include"winsock2.h"
#pragma comment(lib,"ws2_32.lib")
#include<iostream>
using namespace std;
int main(int argc, char* argv[])
{
const int BUF_SIZE= 64;
WSADATA wsd;//WSADATA变量
SOCKET sServer;//服务器套接字
SOCKET sClient;//客户端套接字
SOCKADDR_IN addrServ;;//服务器地址
char buf[BUF_SIZE];//接收数据缓冲区
char sendBuf[BUF_SIZE];//返回给客户端得数据
int retVal;//返回值
//初始化套结字动态库
if(WSAStartup(MAKEWORD(2,2),&wsd)!= 0)
{
cout<<"WSAStartup failed!"<< endl;
return 1;
}
//创建套接字
sServer= socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(INVALID_SOCKET== sServer)
{
cout<<"socket failed!"<< endl;
WSACleanup();//释放套接字资源;
return-1;
}
//服务器套接字地址
addrServ.sin_family= AF_INET;
addrServ.sin_port= htons(4999);
addrServ.sin_addr.s_addr= INADDR_ANY;
//绑定套接字
retVal= bind(sServer,(LPSOCKADDR)&addrServ, sizeof(SOCKADDR_IN));
if(SOCKET_ERROR== retVal)
{
cout<<"bind failed!"<< endl;
closesocket(sServer);//关闭套接字
WSACleanup();//释放套接字资源;
return-1;
}
//开始监听
retVal= listen(sServer, 1);
if(SOCKET_ERROR== retVal)
{
cout<<"listen failed!"<< endl;
closesocket(sServer);//关闭套接字
WSACleanup();//释放套接字资源;
return-1;
}
//接受客户端请求
sockaddr_in addrClient;
int addrClientlen= sizeof(addrClient);
sClient= accept(sServer,(sockaddr FAR*)&addrClient,&addrClientlen);
if(INVALID_SOCKET== sClient)
{
cout<<"accept failed!"<< endl;
closesocket(sServer);//关闭套接字
WSACleanup();//释放套接字资源;
return-1;
}
while(true)
{
//接收客户端数据
ZeroMemory(buf, BUF_SIZE);
retVal= recv(sClient, buf, BUF_SIZE, 0);
if(SOCKET_ERROR== retVal)
{
cout<<"recv failed!"<< endl;
closesocket(sServer);//关闭套接字
closesocket(sClient);//关闭套接字
WSACleanup();//释放套接字资源;
return-1;
}
if(buf[0]=='0')
break;
cout<<"客户端发送的数据:"<< buf<<endl;
cout<<"向客户端发送数据:";
cin>> sendBuf;
send(sClient, sendBuf, strlen(sendBuf), 0);
}
//退出
closesocket(sServer);//关闭套接字
closesocket(sClient);//关闭套接字
WSACleanup();//释放套接字资源;
return 0;
}
客户端程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include"winsock2.h"
#include<iostream>
#pragma comment(lib,"ws2_32.lib")
using namespace std;
BOOL RecvLine(SOCKET s, char* buf);//读取一行数据
int main(int argc, char* argv[])
{
const int BUF_SIZE= 64;
WSADATA wsd;//WSADATA变量
SOCKET sHost;//服务器套接字
SOCKADDR_IN servAddr;//服务器地址
char buf[BUF_SIZE];//接收数据缓冲区
char bufRecv[BUF_SIZE];
int retVal;//返回值
//初始化套结字动态库
if(WSAStartup(MAKEWORD(2,2),&wsd)!= 0)
{
cout<<"WSAStartup failed!"<< endl;
return-1;
}
//创建套接字
sHost= socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(INVALID_SOCKET== sHost)
{
cout<<"socket failed!"<< endl;
WSACleanup();//释放套接字资源
return-1;
}
//设置服务器地址和端口
servAddr.sin_family=AF_INET;
servAddr.sin_addr.s_addr= inet_addr("127.0.0.1");
servAddr.sin_port= htons((short)4999);
int nServAddlen= sizeof(servAddr);
//连接服务器
retVal=connect(sHost,(LPSOCKADDR)&servAddr, sizeof(servAddr));
if(SOCKET_ERROR== retVal)
{
cout<<"connect failed!"<< endl;
closesocket(sHost);//关闭套接字
WSACleanup();//释放套接字资源
return-1;
}
while(true)
{
//向服务器发送数据
ZeroMemory(buf, BUF_SIZE);
cout<<"向服务器发送数据:";
cin>> buf;
retVal= send(sHost, buf, strlen(buf), 0);
if(SOCKET_ERROR== retVal)
{
cout<<"send failed!"<< endl;
closesocket(sHost);//关闭套接字
WSACleanup();//释放套接字资源
return-1;
}
//RecvLine(sHost, bufRecv);
ZeroMemory(bufRecv, BUF_SIZE);
recv(sHost, bufRecv,BUF_SIZE, 0);//接收服务器端的数据,只接收5个字符
cout<< endl<<"从服务器接收数据:"<< bufRecv;
cout<<"\n";
}
//退出
closesocket(sHost);//关闭套接字
WSACleanup();//释放套接字资源
return 0;
}
如何构建一个基于netty的后端服务器
直接上干货,这个是前奏,比较山寨的实现,大家可先自行看下
https://github.com/xiaotutiger/miyue-music-service/tree/master/miyue-music-service
下面将分析手头上一个项目,运用的技术很全,值得学习,先做一个简单介绍,当然业务部分代码就不讲了。
整个工程采用maven来管理,主要的技术是spring+jedis+netty+disruptor.看这个组合,这个服务器端性能应该很不错。
这个工程又引发我对技术无限热爱,哈哈。
这
个工程,目前主要是针对一些基于json/xml/text格式的请求,同时也是支持标准手机请求的,当然,可以自定义一些其他格式或者pc端的请求,而
且针对不同URI,后面挂了不同的handler,这些可能都是一些web处理的基本思想,只是脱离了常规的web容器或者应用服务器。
xml工具采用xstram来处理,两个字,方便。
json工具采用jackson\不知道和业界出名的fastjson\gson\sf.json有何区别,待鉴定。
客
户端的请求,统一继承ClientRequestModel,经过编码统一转化为domainMessage,交由disruptor来处理,其实oop
里什么继承,实现,封装思想,大部分都在围绕一个东西在走,一句话,把看似各有棱角的东西如何转化为共同的东西,求同存异啊(比如,水,石头,空气等,如
果在这一层,我们没法统一用一个特征来表示,我们可以先把它转化为分子,那是不是可以用同一个东西来表示呢?如何高度抽象封装,这真是一门艺术)。
看这个工程对客户端请求,是如何一步步处理的,message->request->event交由disruptor来处理,很美妙的思想。在了解这些之前,我们有必要深入学习一下disruptor,很特别的一个框架,宣言很牛逼,中文文档在这里(http://ifeve.com/dissecting-disruptor-whats-so-special/),E文好的同学请移步到这里(http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html)
了解disruptor之前,先学习下ringbuffer是如何实现的?
1、ringbuffer的特别之处:
只有一个指针,没有尾指针,基于数组,且不会删除元素,元素会覆盖,充分利用缓存行,减少垃圾回收。
2、如何从ringbuffer读取数据:
------------------------------------------2013-9-9补充-----------------------------------------------------
下面主要讲一下请求如何处理这块架构吧,其实架构这个东西,说简单一点,就是一种简单可扩展的实现方式,在某些程度上,不要太在意性能。
底层通信建立在netty之上,基本没做任何改动
Java代码
public class HttpServerPipelineFactory implements ChannelPipelineFactory{
private ChannelUpstreamHandler channelUpstreamHandler;
public ChannelPipeline getPipeline() throws Exception{
// Create a default pipeline implementation.
ChannelPipeline pipeline= pipeline();
// Uncomment the following line if you want HTTPS
//SSLEngine engine= SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
pipeline.addLast("deflater", new HttpContentCompressor());
//pipeline.addLast("handler", new HttpRequestHandler());
pipeline.addLast("handler", channelUpstreamHandler);
return pipeline;
}
public void setChannelUpstreamHandler(ChannelUpstreamHandler channelUpstreamHandler){
this.channelUpstreamHandler= channelUpstreamHandler;
}
}
相关spring配置
Java代码
<bean id="httpServerPipelineFactory" class="com.yunchao.cm.network.http.HttpServerPipelineFactory">
<property name="channelUpstreamHandler" ref="httpRequestHandler"/>
</bean>
Java代码
<bean id="httpRequestHandler" class="com.yunchao.cm.network.http.HttpRequestHandler">
<property name="urlMaps">
<map>
<entry key="/payorder">
<ref bean="payOrderCodecFactory"/>
</entry>
<entry key="/question">
<ref bean="questionCodecFactory"/>
</entry>
<entry key="/sms">
<ref bean="smsCodecFactory"/>
</entry>
代码太多,不全部贴出来,后面整理一下放到我的github上去。
基如此,我们还是得定义一个handler,继承simpleChannelUpstreamHander,并重写了messageReceied方法,具体在这里。
Java代码
QueryStringDecoder queryStringDecoder= new QueryStringDecoder(request.getUri());
String url= queryStringDecoder.getPath();
CodecFactory codecFactory= urlMaps.get(url);
if(null== codecFactory){
logger.error("unsupported url:{} request.", url);
//sendError(ctx, BAD_REQUEST);
e.getChannel().close();
return;
}
//获取cmwap网络中的手机号码
String phone= PhoneUtils.getPhone(request.getHeader("x-up-calling-line-id"));
if(request.getMethod().equals(HttpMethod.POST)){
ChannelBuffer content= request.getContent();
String postParams= content.toString(CharsetUtil.UTF_8);
logger.debug("request content:{}", postParams);
ClientRequestModel model=(ClientRequestModel) codecFactory.decode(postParams);
model.setProperty(model.MESSAGE_EVENT_KEY, e);
model.setProperty(model.HTTP_REQUEST_KEY, request);
model.setProperty(model.HTTP_PHONE_KEY, phone);
InetSocketAddress remoteAddress=(InetSocketAddress) e.getRemoteAddress();
model.setProperty(model.IP_KEY, remoteAddress.getAddress().getHostAddress());
logger.info("user request model:{}", model);
model.fireSelf();
Java代码
@Override
public DomainMessage fireSelf(){
DomainMessage em= new DomainMessage(this);
EventUtils.fireEvent(em,"alipayNotifyState");
return em;
}
看到这里基本上能够清楚了,是如何把客户端请求包装成ClientRequestModel了,且后面涉及到处理的对象,全部继承它,在整个架构之
中,has a优于 is
a,对于客户端netty的一些对象,也是存储在ClientRequestModel中,codec无非也是采用了xml/json/kv,如斯,实现
了字节与对象之间的转换。
除
此之外,突然想到刚来杭州工作的第一家公司,基于此,采用的架构师servlet充当服务器,因为这是一个公司内部的server,而不是一个平台,采用
的数据格式也比较单一,就是xml,但是采用的外部类库也是xstream来处理的,但是整个系统维持的日调用量也是在百万级别,运用的client则是
采用httpclient,对于不同请求后面挂的handler,是在容器启动时加载到内存中,其余也没有什么亮点了。
文章分享结束,netty服务器和如何构建一个基于netty的后端服务器的答案你都知道了吗?欢迎再次光临本站哦!