吃透Netty源码系列四十五之HttpObjectAggregator详解
- 聚合消息
- 先介绍一下一些属性
- 结构
- MessageAggregator
- acceptInboundMessage判断类型
- decode真正的聚合
- HttpObjectAggregator的isStartMessage
- HttpObjectAggregator的isLastContentMessage
- HttpObjectAggregator的isAggregated
- HttpObjectAggregator的newContinueResponse
- HttpObjectAggregator的closeAfterContinueResponse
- HttpObjectAggregator的beginAggregation
- appendPartialContent
- HttpObjectAggregator的aggregate
- finishAggregation0
聚合消息
前面我们讲了, 一个HTTP
请求最少也会在HttpRequestDecoder
里分成两次往后传递,第一次是消息行和消息头,第二次是消息体,哪怕没有消息体,也会传一个空消息体。如果发送的消息体比较大的话,可能还会分成好几个消息体来处理,往后传递多次,这样使得我们后续的处理器可能要写多个逻辑判断,比较麻烦,那能不能把消息都整合成一个完整的,再往后传递呢,当然可以,用HttpObjectAggregator
。
先介绍一下一些属性
HTTP
有个头属性Except:100-continue
用来优化服务器和客户端数据传输的,在要发送比较大的数据的时候,不会直接发送,而是会先征求下服务器意见是否可以继续发送数据,服务器可以允许也可以不允许,都应该响应一下。具体介绍可以参考这篇文章。
//接受100-continue,响应状态码100
private static final FullHttpResponse CONTINUE =//Except:100-continue的响应
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
//不接受,响应状态码417 不支持
private static final FullHttpResponse EXPECTATION_FAILED = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.EXPECTATION_FAILED, Unpooled.EMPTY_BUFFER);
//不接受,响应状态码413 消息体太大而关闭连接
private static final FullHttpResponse TOO_LARGE_CLOSE = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);
//不接受,响应状态码413 消息体太大,没关闭连接
private static final FullHttpResponse TOO_LARGE = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);
static {//设定头消息
EXPECTATION_FAILED.headers().set(CONTENT_LENGTH, 0);
TOO_LARGE.headers().set(CONTENT_LENGTH, 0);
TOO_LARGE_CLOSE.headers().set(CONTENT_LENGTH, 0);
TOO_LARGE_CLOSE.headers().set(CONNECTION, HttpHeaderValues.CLOSE);//关闭头信息
}
private final boolean closeOnExpectationFailed;//如果消息过大是否关闭连接,报异常
结构
看到他有4
个泛型,分别对应是聚合HTTP
类型的,HTTP
通用消息请求行和请求头的,HTTP
消息体,HTTP
完整通用消息,包括消息体
对应的父类的泛型就是:
这些类型直接会影响到后续的逻辑判断,所以要弄清楚对应的关系。
MessageAggregator
主要的逻辑代码在这里,这个是通用的模板,里面就是模板方法啦,先看下他的一些属性吧,他会把HTTP
的消息体都封装成一个缓冲区,加到复合缓冲区里。
private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;//最大复合缓冲区组件个数
private final int maxContentLength;//最大消息图长度
private O currentMessage;//当前消息
private boolean handlingOversizedMessage;//是否处理过大消息
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;//累加组件的最大个数
private ChannelHandlerContext ctx;//处理器上下文
private ChannelFutureListener continueResponseWriteListener;// 100-continue响应监听器
private boolean aggregating;//是否正在聚合
acceptInboundMessage判断类型
判断是否是泛型I
类型,也就是我们HttpObjectAggregator
泛型中的HttpObject
类型,是才会处理,否则就不处理。然后会判断是否聚合好了,如果没开始聚合就进行聚合,如果还在聚合就继续。
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (!super.acceptInboundMessage(msg)) {//是否是泛型I类型,比如HttpObject类型
return false;
}
@SuppressWarnings("unchecked")
I in = (I) msg;
if (isAggregated(in)) {//是否聚合好了
return false;
}
if (isStartMessage(in)) {//是否是开始聚合
aggregating = true;//开始聚合
return true;
} else if (aggregating && isContentMessage(in)) {//正在内容聚合
return true;
}
return false;
}
decode真正的聚合
这个方法比较长,但是做的事情分那么几个:
- 如果是开始消息,也就不是请求体,那就开始判断是否有
Except:100-continue
头信息,有的话根据长度和是否支持来判断是否要返回响应。之后判断如果前面解码失败,就直接整合消息体返回,否则就创建复合缓冲区,如果是消息体的话就添加进去,然后封装成一个完整的消息类型。 - 如果是消息体了,就加入到复合画冲去里,然后判断是否是最后一个消息体,是的话就进行最后的整合,其实就是设置
Content-Length
头信息。
@Override
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
assert aggregating;
if (isStartMessage(msg)) {//是否是开始消息
handlingOversizedMessage = false;//没处理超大信息
if (currentMessage != null) {//上次的消息没释放
currentMessage.release();
currentMessage = null;
throw new MessageAggregationException();
}
@SuppressWarnings("unchecked")
S m = (S) msg;
// 100-continue需要持续响应
Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
if (continueResponse != null) {//有 100-continue响应
// Cache the write listener for reuse.
ChannelFutureListener listener = continueResponseWriteListener;
if (listener == null) {//不存在监听器要创建一个
continueResponseWriteListener = listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
}
}
};
}
boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
//这里会直接刷出去,所以HttpResponseEncoder需要放在这个前面,不然写出去没编码过会报错的
final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);
if (closeAfterWrite) {
future.addListener(ChannelFutureListener.CLOSE);
return;
}
if (handlingOversizedMessage) {
return;
}
} else if (isContentLengthInvalid(m, maxContentLength)) {//消息体长度是否超过了
invokeHandleOversizedMessage(ctx, m);
return;
}
//解码不成功
if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
O aggregated;
if (m instanceof ByteBufHolder) {
aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
} else {
aggregated = beginAggregation(m, EMPTY_BUFFER);
}
finishAggregation0(aggregated);
out.add(aggregated);
return;
}
CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);//创建复合缓冲区
if (m instanceof ByteBufHolder) {//是内容
appendPartialContent(content, ((ByteBufHolder) m).content());
}
currentMessage = beginAggregation(m, content);//开始聚合
} else if (isContentMessage(msg)) {//后面属于消息体聚合
if (currentMessage == null) {//长度超过最大了,直接丢弃了,不处理了
return;
}
//提取内容
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
@SuppressWarnings("unchecked")
final C m = (C) msg;
// 超过最大长度了,处理过大的消息
if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
@SuppressWarnings("unchecked")
S s = (S) currentMessage;
invokeHandleOversizedMessage(ctx, s);
return;
}
//添加新的内容到复合缓冲区
appendPartialContent(content, m.content());
aggregate(currentMessage, m);//整合尾部请求头
final boolean last;//是不是最后一次聚合
if (m instanceof DecoderResultProvider) {//处理解码结果
DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
if (!decoderResult.isSuccess()) {//没解码成功
if (currentMessage instanceof DecoderResultProvider) {
((DecoderResultProvider) currentMessage).setDecoderResult(
DecoderResult.failure(decoderResult.cause()));
}
last = true;
} else {
last = isLastContentMessage(m);//是否是最后的内容
}
} else {
last = isLastContentMessage(m);
}
if (last) {//是最后的
finishAggregation0(currentMessage);
// All done
out.add(currentMessage);
currentMessage = null;
}
} else {
throw new MessageAggregationException();
}
}
HttpObjectAggregator的isStartMessage
对HTTP
来说其实就是判断是否是通用的消息行和消息头信息。
@Override
protected boolean isStartMessage(HttpObject msg) throws Exception {
return msg instanceof HttpMessage;
}
HttpObjectAggregator的isLastContentMessage
是否是最后的内容。
@Override
protected boolean isLastContentMessage(HttpContent msg) throws Exception {
return msg instanceof LastHttpContent;
}
HttpObjectAggregator的isAggregated
是否聚合好了。
@Override
protected boolean isAggregated(HttpObject msg) throws Exception {
return msg instanceof FullHttpMessage;
}
HttpObjectAggregator的newContinueResponse
如果需要100-continue
响应的话,要把100-continue
头设置去掉,不往后传播了。
@Override
protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
Object response = continueResponse(start, maxContentLength, pipeline);
if (response != null) {
start.headers().remove(EXPECT);//如果有100-continue响应,就不用再传播下去了
}
return response;
}
HttpObjectAggregator的continueResponse
这个就是上面说的根据是否支持100-continue
,是否长度超过限制等进行响应。
private static Object continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
if (HttpUtil.isUnsupportedExpectation(start)) {//不支持Expect头
pipeline.fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
return EXPECTATION_FAILED.retainedDuplicate();
} else if (HttpUtil.is100ContinueExpected(start)) {//支持100-continue请求
if (getContentLength(start, -1L) <= maxContentLength) {
return CONTINUE.retainedDuplicate();//继续
}
pipeline.fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
return TOO_LARGE.retainedDuplicate();//消息体太大
}
return null;
}
HttpObjectAggregator的closeAfterContinueResponse
是否不支持100-continue
后把连接断开。
@Override
protected boolean closeAfterContinueResponse(Object msg) {
return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
}
HttpObjectAggregator的ignoreContentAfterContinueResponse
如果直接给他报400
的话就要断开了,后面的内容就不忽略了。
@Override
protected boolean ignoreContentAfterContinueResponse(Object msg) {
if (msg instanceof HttpResponse) {
final HttpResponse httpResponse = (HttpResponse) msg;
return httpResponse.status().codeClass().equals(HttpStatusClass.CLIENT_ERROR);
}
return false;
}
HttpObjectAggregator的beginAggregation
开始聚合就是创建一个聚合的类,根据不同情况创建请求还是响应的完整类型。
@Override
protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
assert !(start instanceof FullHttpMessage);
HttpUtil.setTransferEncodingChunked(start, false);
AggregatedFullHttpMessage ret;
if (start instanceof HttpRequest) {
ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);//聚合请求
} else if (start instanceof HttpResponse) {
ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);//聚合响应
} else {
throw new Error();
}
return ret;
}
appendPartialContent
这个就是将内容添加到复合缓冲区里。
private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
if (partialContent.isReadable()) {//可读的话就加进去
content.addComponent(true, partialContent.retain());
}
}
HttpObjectAggregator的aggregate
这个就是整合尾部的头信息,因为chunk
协议可能会有尾部头信息的。
@Override
protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
if (content instanceof LastHttpContent) {//如果是最后的尾部内容就整合尾部头信息
// Merge trailing headers into the message.
((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
}
}
finishAggregation0
完成聚合,标志位也设置为false
了,最后再坚持一遍头信息。
private void finishAggregation0(O aggregated) throws Exception {
aggregating = false;
finishAggregation(aggregated);
}
finishAggregation
最后检查下,如果没设置Content-Length
头的话要设置。
@Override
protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
if (!HttpUtil.isContentLengthSet(aggregated)) {//没设置Content-Length头的话要设置
aggregated.headers().set(
CONTENT_LENGTH,
String.valueOf(aggregated.content().readableBytes()));
}
}
基本上所有的方法都讲了,其实说白了,就是把先到的包保存下来,等最后接收完了一起传递给后面的。其他的一些异常什么的就不说了,自己看看就好了。最后要注意用的时候,这个放到HttpResponseEncoder
后面,否则他出站的错误消息不经过HttpResponseEncoder
响应解码器,底层传输是不支持的:
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
转载:https://blog.csdn.net/wangwei19871103/article/details/104613277