使用示例
最近使用 HTTPClient 踩了不少坑,故在此总结下。本文基于 HTTPClient 4.5.6 进行分析,分析源码之前,先贴下用法示例:
public class HttpUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtil.class);
private static final int MAX_TIMEOUT = 400;
private static final CloseableHttpClient httpClient;
static {
// 自定义 SSL 策略
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", createSSLConnSocketFactory())
.build();
// 设置连接池
PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager(registry);
connMgr.setMaxTotal(100); // 设置连接池大小
connMgr.setDefaultMaxPerRoute(connMgr.getMaxTotal());
connMgr.setValidateAfterInactivity(60000); // 设置长连接
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(MAX_TIMEOUT) // 连接超时
.setSocketTimeout(MAX_TIMEOUT) // 传输超时
.setConnectionRequestTimeout(MAX_TIMEOUT) // 设置从连接池获取连接实例的超时
.build();
httpClient = HttpClients.custom()
.setConnectionManager(connMgr)
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.setRetryHandler(new MyHttpRequestRetryHandler()) // 重试 1 次
.setDefaultRequestConfig(requestConfig)
.build();
}
public static byte[] doGet(String url) throws IOException {
HttpGet httpGet = new HttpGet(url);
CloseableHttpResponse response = null;
try {
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
String message = EntityUtils.toString(response.getEntity());
throw new HttpResponseException(statusCode, message);
}
byte[] bytes = EntityUtils.toByteArray(response.getEntity());
return bytes;
} catch (Exception e) {
LOGGER.error("doGet error, url: {}", url, e);
throw e;
} finally {
if (response != null) {
try {
EntityUtils.consume(response.getEntity());
} catch (IOException e) {
}
}
}
}
private static SSLConnectionSocketFactory createSSLConnSocketFactory() {
SSLConnectionSocketFactory sslsf = null;
try {
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}).build();
sslsf = new SSLConnectionSocketFactory(sslContext, new HostnameVerifier() {
@Override
public boolean verify(String s, SSLSession sslSession) {
return true; // 信任所有证书
}
});
} catch (GeneralSecurityException e) {
LOGGER.error("createSSLConnSocketFactory error", e);
throw Throwables.propagate(e);
}
return sslsf;
}
}
使用 HTTPClient 时需要注意几点:
- 设置合理的超时时间:连接超时、读取超时最常见,容易被忽略的是从连接池中获取连接的超时时间。
- 设置合理的连接池大小:连接池大小和读取耗时、QPS 有关,一般等于峰值 QPS * 耗时(单位是秒)。
- 设置合理的长连接有效时间:使用连接池时,默认就使用了长连接,长连接有效时间应该和服务端的长连接有效时间保持一致。如果客户端设置的有效时间过长,则会在服务端连接断开时而客户端依然去请求时导致 NoHttpResponseException。也可以通过设置 RequestConfig.setStaleConnectionCheckEnabled 参数让客户端每次请求之前都检查长连接有效性,但是这样会导致性能的下降。之前在关于长连接里也提到过一些注意点。
- 设置合理的重试策略:合理的重试,可以提升应用的可用性。默认的重试策略不会对超时进行重试,然而超时是十分从常见的问题,服务端异常或网络抖动都可能导致超时。这里我自定义重试策略如下所示:
public class MyHttpRequestRetryHandler extends DefaultHttpRequestRetryHandler {
public MyHttpRequestRetryHandler() {
super(1, false, Arrays.asList(
UnknownHostException.class,
SSLException.class)); // 遇到这两种异常时不重试
}
}
源码分析
让我们从 httpClient.execute(httpGet) 开始看代码如何执行的吧:
public CloseableHttpResponse execute(HttpUriRequest request) throws IOException, ClientProtocolException {
return this.execute(request, (HttpContext)null);
}
public CloseableHttpResponse execute(HttpUriRequest request, HttpContext context) throws IOException, ClientProtocolException {
Args.notNull(request, "HTTP request");
return this.doExecute(determineTarget(request), request, context);
}
protected abstract CloseableHttpResponse doExecute(HttpHost var1, HttpRequest var2, HttpContext var3) throws IOException, ClientProtocolException;
沿着 CloseableHttpClient.execute 执行,走到了 doExecute 方法,这是个虚方法,具体实现在 InternalHttpClient:
protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException {
Args.notNull(request, "HTTP request");
HttpExecutionAware execAware = null;
if (request instanceof HttpExecutionAware) {
execAware = (HttpExecutionAware)request;
}
try {
HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);
HttpClientContext localcontext = HttpClientContext.adapt((HttpContext)(context != null ? context : new BasicHttpContext()));
RequestConfig config = null;
if (request instanceof Configurable) {
config = ((Configurable)request).getConfig();
}
// 省略一些配置
this.setupContext(localcontext);
HttpRoute route = this.determineRoute(target, wrapper, localcontext);
// 继续往下执行
return this.execChain.execute(route, wrapper, localcontext, execAware);
} catch (HttpException var9) {
throw new ClientProtocolException(var9);
}
}
doExecute 做了一些包装、配置后,调用 execChain.execute 继续执行,execChain 是一个执行链,其中第一个是 RedirectExec,看名字叫就知道它是专门用来处理请求的重定向的:
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
final List<URI> redirectLocations = context.getRedirectLocations();
if (redirectLocations != null) {
redirectLocations.clear();
}
final RequestConfig config = context.getRequestConfig();
final int maxRedirects = config.getMaxRedirects() > 0 ? config.getMaxRedirects() : 50;
HttpRoute currentRoute = route;
HttpRequestWrapper currentRequest = request;
for (int redirectCount = 0;;) {
// 先执行了,才能考虑重定向嘛
final CloseableHttpResponse response = requestExecutor.execute(
currentRoute, currentRequest, context, execAware);
try {
// isRedirected 里就是根据状态码是否 3xx 来判断是否需要重定向
if (config.isRedirectsEnabled() &&
this.redirectStrategy.isRedirected(currentRequest.getOriginal(), response, context)) {
if (redirectCount >= maxRedirects) {
throw new RedirectException("Maximum redirects ("+ maxRedirects + ") exceeded");
}
redirectCount++;
final HttpRequest redirect = this.redirectStrategy.getRedirect(
currentRequest.getOriginal(), response, context);
if (!redirect.headerIterator().hasNext()) {
final HttpRequest original = request.getOriginal();
redirect.setHeaders(original.getAllHeaders());
}
currentRequest = HttpRequestWrapper.wrap(redirect);
if (currentRequest instanceof HttpEntityEnclosingRequest) {
RequestEntityProxy.enhance((HttpEntityEnclosingRequest) currentRequest);
}
final URI uri = currentRequest.getURI();
final HttpHost newTarget = URIUtils.extractHost(uri);
if (newTarget == null) {
throw new ProtocolException("Redirect URI does not specify a valid host name: " +
uri);
}
// Reset virtual host and auth states if redirecting to another host
if (!currentRoute.getTargetHost().equals(newTarget)) {
final AuthState targetAuthState = context.getTargetAuthState();
if (targetAuthState != null) {
this.log.debug("Resetting target auth state");
targetAuthState.reset();
}
final AuthState proxyAuthState = context.getProxyAuthState();
if (proxyAuthState != null && proxyAuthState.isConnectionBased()) {
this.log.debug("Resetting proxy auth state");
proxyAuthState.reset();
}
}
currentRoute = this.routePlanner.determineRoute(newTarget, currentRequest, context);
if (this.log.isDebugEnabled()) {
this.log.debug("Redirecting to '" + uri + "' via " + currentRoute);
}
EntityUtils.consume(response.getEntity());
response.close();
} else {
// 如果不需要重定向就直接返回
return response;
}
} catch (final RuntimeException ex) {
response.close();
throw ex;
} catch (final IOException ex) {
response.close();
throw ex;
} catch (final HttpException ex) {
try {
// 遇到异常时,调用 EntityUtils.consume 清理数据、关闭连接
EntityUtils.consume(response.getEntity());
} catch (final IOException ioex) {
this.log.debug("I/O error while releasing connection", ioex);
} finally {
response.close();
}
throw ex;
}
}
}
RedirectExec 里调用了 RetryExec,一看就知道是用来做重试的:
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
final Header[] origheaders = request.getAllHeaders();
for (int execCount = 1;; execCount++) {
try {
return this.requestExecutor.execute(route, request, context, execAware);
} catch (final IOException ex) {
if (execAware != null && execAware.isAborted()) {
throw ex;
}
// 判断是否需要重试,这里就会调用我们自定义的 retryHandler 了
if (retryHandler.retryRequest(ex, execCount, context)) {
// 判断 HTTPRequest 里的数据是否可重复利用,否则不重试
if (!RequestEntityProxy.isRepeatable(request)) {
throw new NonRepeatableRequestException("Cannot retry request " +
"with a non-repeatable request entity", ex);
}
request.setHeaders(origheaders);
} else {
if (ex instanceof NoHttpResponseException) {
final NoHttpResponseException updatedex = new NoHttpResponseException(
route.getTargetHost().toHostString() + " failed to respond");
updatedex.setStackTrace(ex.getStackTrace());
throw updatedex;
} else {
throw ex;
}
}
}
}
}
RetryExec 里又调用了 ProtocolExec,就快要执行 HTTP 调用了,执行前后要调用之前 HTTPProcessor 进行处理。
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException,
HttpException {
final HttpRequest original = request.getOriginal();
URI uri = null;
// 省略 URI 获取
request.setURI(uri);
// Re-write request URI if needed
rewriteRequestURI(request, route);
final HttpParams params = request.getParams();
HttpHost virtualHost = (HttpHost) params.getParameter(ClientPNames.VIRTUAL_HOST);
// HTTPCLIENT-1092 - add the port if necessary
if (virtualHost != null && virtualHost.getPort() == -1) {
final int port = route.getTargetHost().getPort();
if (port != -1) {
virtualHost = new HttpHost(virtualHost.getHostName(), port,
virtualHost.getSchemeName());
}
}
HttpHost target = null;
if (virtualHost != null) {
target = virtualHost;
} else {
if (uri != null && uri.isAbsolute() && uri.getHost() != null) {
target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
}
}
// Get user info from the URI
if (uri != null) {
final String userinfo = uri.getUserInfo();
if (userinfo != null) {
CredentialsProvider credsProvider = context.getCredentialsProvider();
if (credsProvider == null) {
credsProvider = new BasicCredentialsProvider();
context.setCredentialsProvider(credsProvider);
}
credsProvider.setCredentials(
new AuthScope(target),
new UsernamePasswordCredentials(userinfo));
}
}
// Run request protocol interceptors
context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
// 调用 HttpRequestIntreceptor,设置 header、cookie、user-agent 等
this.httpProcessor.process(request, context);
final CloseableHttpResponse response = this.requestExecutor.execute(route, request,
context, execAware);
try {
// Run response protocol interceptors
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
// 执行 HttpResponseInterceptor,默认会执行 ResponseContentEncoding 方法对压缩过的数据进行解压
this.httpProcessor.process(response, context);
return response;
} // 省略一些异常处理
}
ProtocolExec 又调用了 MainClientExec,这一步从连接池里获取了连接,调用 requestExecutor 执行,然后判断连接复用策略。
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
AuthState targetAuthState = context.getTargetAuthState();
if (targetAuthState == null) {
targetAuthState = new AuthState();
context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
}
AuthState proxyAuthState = context.getProxyAuthState();
if (proxyAuthState == null) {
proxyAuthState = new AuthState();
context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
}
if (request instanceof HttpEntityEnclosingRequest) {
RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
}
Object userToken = context.getUserToken();
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
if (execAware != null) {
if (execAware.isAborted()) {
connRequest.cancel();
throw new RequestAbortedException("Request aborted");
} else {
execAware.setCancellable(connRequest);
}
}
final RequestConfig config = context.getRequestConfig();
final HttpClientConnection managedConn;
try {
// 从连接池里拿一个连接,只等待指定的超时时间
final int timeout = config.getConnectionRequestTimeout();
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
} catch(final InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw new RequestAbortedException("Request aborted", interrupted);
} catch(final ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause == null) {
cause = ex;
}
throw new RequestAbortedException("Request execution failed", cause);
}
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
// 判断是否需要检查连接有效性,4.5.x 版本默认不检查,4.2.x 版本默认检查,该检查对性能影响较大
if (config.isStaleConnectionCheckEnabled()) {
// validate connection
if (managedConn.isOpen()) {
if (managedConn.isStale()) {
managedConn.close();
}
}
}
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
if (execAware != null) {
execAware.setCancellable(connHolder);
}
HttpResponse response;
for (int execCount = 1;; execCount++) {
if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
throw new NonRepeatableRequestException("Cannot retry request " +
"with a non-repeatable request entity.");
}
if (execAware != null && execAware.isAborted()) {
throw new RequestAbortedException("Request aborted");
}
if (!managedConn.isOpen()) {
try {
// 如果连接已经关闭,就重新建立连接
establishRoute(proxyAuthState, managedConn, route, request, context);
} catch (final TunnelRefusedException ex) {
response = ex.getResponse();
break;
}
}
// 设置连接超时时间
final int timeout = config.getSocketTimeout();
if (timeout >= 0) {
managedConn.setSocketTimeout(timeout);
}
// 执行请求
response = requestExecutor.execute(request, managedConn, context);
// 判断是否要重用连接
if (reuseStrategy.keepAlive(response, context)) {
// 计算连接有效时间,有些服务端会在响应 response 里指定
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
connHolder.markReusable();
} else {
connHolder.markNonReusable();
}
}
if (userToken == null) {
userToken = userTokenHandler.getUserToken(context);
context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
}
if (userToken != null) {
connHolder.setState(userToken);
}
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
} else {
return new HttpResponseProxy(response, connHolder);
}
} catch (final ConnectionShutdownException ex) {
final InterruptedIOException ioex = new InterruptedIOException(
"Connection has been shut down");
ioex.initCause(ex);
throw ioex;
} // 省略异常处理
}
MainClientExec 里又调用了 HttpRequestExecutor,这里才真正的开始发送 HTTP 请求,接收 HTTP 响应。
public HttpResponse execute(HttpRequest request, HttpClientConnection conn, HttpContext context) throws IOException, HttpException {
try {
// 发送数据
HttpResponse response = this.doSendRequest(request, conn, context);
if (response == null) {
// 接收响应
response = this.doReceiveResponse(request, conn, context);
}
return response;
} // 省略异常处理
}
protected HttpResponse doSendRequest(HttpRequest request, HttpClientConnection conn, HttpContext context) throws IOException, HttpException {
HttpResponse response = null;
context.setAttribute("http.connection", conn);
context.setAttribute("http.request_sent", Boolean.FALSE);
// 写 HTTP header,最终是调用 SessionOutputBufferImpl -> SocketOutputStream
conn.sendRequestHeader(request);
if (request instanceof HttpEntityEnclosingRequest) {
boolean sendentity = true;
ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
if (((HttpEntityEnclosingRequest)request).expectContinue() && !ver.lessEquals(HttpVersion.HTTP_1_0)) {
conn.flush();
if (conn.isResponseAvailable(this.waitForContinue)) {
response = conn.receiveResponseHeader();
if (this.canResponseHaveBody(request, response)) {
conn.receiveResponseEntity(response);
}
int status = response.getStatusLine().getStatusCode();
if (status < 200) {
if (status != 100) {
throw new ProtocolException("Unexpected response: " + response.getStatusLine());
}
response = null;
} else {
sendentity = false;
}
}
}
if (sendentity) {
// 写 http body
conn.sendRequestEntity((HttpEntityEnclosingRequest)request);
}
}
conn.flush();
context.setAttribute("http.request_sent", Boolean.TRUE);
return response;
}
protected HttpResponse doReceiveResponse(HttpRequest request, HttpClientConnection conn, HttpContext context) throws HttpException, IOException {
HttpResponse response = null;
int statusCode = 0;
while(response == null || statusCode < 200) {
// 读取 response header,最终调用了 SessionInputBufferImpl -> SocketInputStream
response = conn.receiveResponseHeader();
// 读取状态码
statusCode = response.getStatusLine().getStatusCode();
if (statusCode < 100) {
throw new ProtocolException("Invalid response: " + response.getStatusLine());
}
// 读取 response body
if (this.canResponseHaveBody(request, response)) {
conn.receiveResponseEntity(response);
}
}
return response;
}
转载:https://blog.csdn.net/hustspy1990/article/details/101038687
查看评论