HTTP 是基于请求/响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会返回一个 HTTP 响应。Netty 提供了多种编码器和解码器以简化对这个协议的使用。一个HTTP 请求/响应可能由多个数据部分组成,FullHttpRequest 和FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest、LastHttpContent 等等)都实现了 HttpObject 接口。
(1) HttpRequestEncoder 将 HttpRequest、HttpContent 和 LastHttpContent 消息编码为字节。 (2) HttpResponseEncoder 将 HttpResponse、HttpContent 和 LastHttpContent 消息编码为字节。 (3) HttpRequestDecoder 将字节解码为 HttpRequest、HttpContent 和 LastHttpContent 消息。 (4) HttpResponseDecoder 将字节解码为 HttpResponse、HttpContent 和 LastHttpContent 消息。 (5) HttpClientCodec 和 HttpServerCodec 则将请求和响应做了一个组合。
由于 HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。
为了消除这项繁琐的任务,Netty 提供了一个聚合器 HttpObjectAggregator,它可以将多个消
息部分合并为 FullHttpRequest 或者 FullHttpResponse 消息。通过这样的方式,你将总是看
到完整的消息内容。
当使用 HTTP 时,建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带
来一些 CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来
说。Netty 为压缩和解压缩提供了 ChannelHandler 实现,它们同时支持 gzip 和 deflate 编码。
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.28.Final</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency> <!--工具--> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.4</version> </dependency> <!--日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <optional>true</optional> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build>
public class HttpConsts { private HttpConsts() { } public static final Integer PORT = 8888; public static final String HOST = "127.0.0.1"; }
2.3.1 HttpServer
@Slf4j public class HttpServer { public static void main(String[] args) throws InterruptedException { HttpServer httpServer = new HttpServer(); httpServer.start(); } public void start() throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new HttpServerHandlerInitial()); ChannelFuture channelFuture = serverBootstrap.bind(HttpConsts.PORT).sync(); log.info("服务器已开启......"); channelFuture.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
2.3.2 HttpServerBusinessHandler
@Slf4j public class HttpServerBusinessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //通过编解码器把byteBuf解析成FullHttpRequest if (msg instanceof FullHttpRequest) { //获取httpRequest FullHttpRequest httpRequest = (FullHttpRequest) msg; try { //获取请求路径、请求体、请求方法 String uri = httpRequest.uri(); String content = httpRequest.content().toString(CharsetUtil.UTF_8); HttpMethod method = httpRequest.method(); log.info("服务器接收到请求:"); log.info("请求uri:{},请求content:{},请求method:{}", uri, content, method); //响应 String responseMsg = "Hello World"; FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1,HttpResponseStatus.OK, Unpooled.copiedBuffer(responseMsg,CharsetUtil.UTF_8) ); response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } finally { httpRequest.release(); } } } }
2.3.3 HttpServerHandlerInitial
public class HttpServerHandlerInitial extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //http请求编解码器,请求解码,响应编码 pipeline.addLast("serverCodec", new HttpServerCodec()); //http请求报文聚合为完整报文,最大请求报文为10M pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); //响应报文压缩 pipeline.addLast("compress", new HttpContentCompressor()); //业务处理handler pipeline.addLast("serverBusinessHandler", new HttpServerBusinessHandler()); } }
2.4.1 HttpClient
public class HttpClient { public static void main(String[] args) throws InterruptedException { HttpClient httpClien = new HttpClient(); httpClien.start(); } public void start() throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new HttpClientHandlerInitial()); ChannelFuture f = bootstrap.connect(HttpConsts.HOST, HttpConsts.PORT).sync(); f.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } }
2.4.2 HttpClientBusinessHandler
@Slf4j public class HttpClientBusinessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //通过编解码器把byteBuf解析成FullHttpResponse if (msg instanceof FullHttpResponse) { FullHttpResponse httpResponse = (FullHttpResponse) msg; HttpResponseStatus status = httpResponse.status(); ByteBuf content = httpResponse.content(); log.info("客户端接收响应信息:"); log.info("status:{},content:{}", status, content.toString(CharsetUtil.UTF_8)); httpResponse.release(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //封装请求信息 URI uri = new URI("/test"); String msg = "Hello"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes(CharsetUtil.UTF_8))); //构建http请求 request.headers().set(HttpHeaderNames.HOST, HttpConsts.HOST); request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); // 发送http请求 ctx.writeAndFlush(request); } }
2.4.3 HttpClientHandlerInitial
public class HttpClientHandlerInitial extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //客户端编码、解码器,请求编码,响应解码 pipeline.addLast("clientCodec", new HttpClientCodec()); //http聚合器,将http请求聚合成一个完整报文 pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); //http响应解压缩 pipeline.addLast("decompressor", new HttpContentDecompressor()); //业务handler pipeline.addLast("clientBusinessHandler", new HttpClientBusinessHandler()); } }
启动服务端:
启动客户端: