小言_互联网的博客

Apache HTTPClient 源码解析:主流程

624人阅读  评论(0)

使用示例

最近使用 HTTPClient 踩了不少坑,故在此总结下。本文基于 HTTPClient 4.5.6 进行分析,分析源码之前,先贴下用法示例:

public class HttpUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtil.class);
    private static final int MAX_TIMEOUT = 400;
    private static final CloseableHttpClient httpClient;

    static {
    	// 自定义 SSL 策略
        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("http", PlainConnectionSocketFactory.getSocketFactory())
                .register("https", createSSLConnSocketFactory())
                .build();
        // 设置连接池
        PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager(registry);
        connMgr.setMaxTotal(100); // 设置连接池大小
        connMgr.setDefaultMaxPerRoute(connMgr.getMaxTotal());
        connMgr.setValidateAfterInactivity(60000); // 设置长连接

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(MAX_TIMEOUT)                   // 连接超时
                .setSocketTimeout(MAX_TIMEOUT)                    // 传输超时
                .setConnectionRequestTimeout(MAX_TIMEOUT)         // 设置从连接池获取连接实例的超时
                .build();

        httpClient = HttpClients.custom()
                .setConnectionManager(connMgr)
                .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
                .setRetryHandler(new MyHttpRequestRetryHandler())  // 重试 1 次
                .setDefaultRequestConfig(requestConfig)
                .build();
    }

    public static byte[] doGet(String url) throws IOException {
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse response = null;
        try {
            response = httpClient.execute(httpGet);
            int statusCode = response.getStatusLine().getStatusCode();
            if (statusCode != HttpStatus.SC_OK) {
                String message = EntityUtils.toString(response.getEntity());
                throw new HttpResponseException(statusCode, message);
            }
            byte[] bytes = EntityUtils.toByteArray(response.getEntity());
            return bytes;
        } catch (Exception e) {
            LOGGER.error("doGet error, url: {}", url, e);
            throw e;
        } finally {
            if (response != null) {
                try {
                    EntityUtils.consume(response.getEntity());
                } catch (IOException e) {
                }
            }
        }
    }

    private static SSLConnectionSocketFactory createSSLConnSocketFactory() {
        SSLConnectionSocketFactory sslsf = null;
        try {
            SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
                public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                    return true;
                }
            }).build();
            sslsf = new SSLConnectionSocketFactory(sslContext, new HostnameVerifier() {
                @Override
                public boolean verify(String s, SSLSession sslSession) {
                    return true;    // 信任所有证书
                }
            });
        } catch (GeneralSecurityException e) {
            LOGGER.error("createSSLConnSocketFactory error", e);
            throw Throwables.propagate(e);
        }
        return sslsf;
    }
}

使用 HTTPClient 时需要注意几点:

  1. 设置合理的超时时间:连接超时、读取超时最常见,容易被忽略的是从连接池中获取连接的超时时间。
  2. 设置合理的连接池大小:连接池大小和读取耗时、QPS 有关,一般等于峰值 QPS * 耗时(单位是秒)。
  3. 设置合理的长连接有效时间:使用连接池时,默认就使用了长连接,长连接有效时间应该和服务端的长连接有效时间保持一致。如果客户端设置的有效时间过长,则会在服务端连接断开时而客户端依然去请求时导致 NoHttpResponseException。也可以通过设置 RequestConfig.setStaleConnectionCheckEnabled 参数让客户端每次请求之前都检查长连接有效性,但是这样会导致性能的下降。之前在关于长连接里也提到过一些注意点。
  4. 设置合理的重试策略:合理的重试,可以提升应用的可用性。默认的重试策略不会对超时进行重试,然而超时是十分从常见的问题,服务端异常或网络抖动都可能导致超时。这里我自定义重试策略如下所示:
public class MyHttpRequestRetryHandler extends DefaultHttpRequestRetryHandler {
    public MyHttpRequestRetryHandler() {
        super(1, false, Arrays.asList(
                UnknownHostException.class,
                SSLException.class)); // 遇到这两种异常时不重试
    }
}

源码分析

让我们从 httpClient.execute(httpGet) 开始看代码如何执行的吧:

    public CloseableHttpResponse execute(HttpUriRequest request) throws IOException, ClientProtocolException {
        return this.execute(request, (HttpContext)null);
    }
    public CloseableHttpResponse execute(HttpUriRequest request, HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        return this.doExecute(determineTarget(request), request, context);
    }
    protected abstract CloseableHttpResponse doExecute(HttpHost var1, HttpRequest var2, HttpContext var3) throws IOException, ClientProtocolException;

沿着 CloseableHttpClient.execute 执行,走到了 doExecute 方法,这是个虚方法,具体实现在 InternalHttpClient:

    protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        HttpExecutionAware execAware = null;
        if (request instanceof HttpExecutionAware) {
            execAware = (HttpExecutionAware)request;
        }

        try {
            HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);
            HttpClientContext localcontext = HttpClientContext.adapt((HttpContext)(context != null ? context : new BasicHttpContext()));
            RequestConfig config = null;
            if (request instanceof Configurable) {
                config = ((Configurable)request).getConfig();
            }

            // 省略一些配置

            this.setupContext(localcontext);
            HttpRoute route = this.determineRoute(target, wrapper, localcontext);
            // 继续往下执行
            return this.execChain.execute(route, wrapper, localcontext, execAware);
        } catch (HttpException var9) {
            throw new ClientProtocolException(var9);
        }
    }

doExecute 做了一些包装、配置后,调用 execChain.execute 继续执行,execChain 是一个执行链,其中第一个是 RedirectExec,看名字叫就知道它是专门用来处理请求的重定向的:

    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        final List<URI> redirectLocations = context.getRedirectLocations();
        if (redirectLocations != null) {
            redirectLocations.clear();
        }

        final RequestConfig config = context.getRequestConfig();
        final int maxRedirects = config.getMaxRedirects() > 0 ? config.getMaxRedirects() : 50;
        HttpRoute currentRoute = route;
        HttpRequestWrapper currentRequest = request;
        for (int redirectCount = 0;;) {
        	// 先执行了,才能考虑重定向嘛
            final CloseableHttpResponse response = requestExecutor.execute(
                    currentRoute, currentRequest, context, execAware);
            try {
            	// isRedirected 里就是根据状态码是否 3xx 来判断是否需要重定向
                if (config.isRedirectsEnabled() &&
                        this.redirectStrategy.isRedirected(currentRequest.getOriginal(), response, context)) { 

                    if (redirectCount >= maxRedirects) {
                        throw new RedirectException("Maximum redirects ("+ maxRedirects + ") exceeded");
                    }
                    redirectCount++;

                    final HttpRequest redirect = this.redirectStrategy.getRedirect(
                            currentRequest.getOriginal(), response, context);
                    if (!redirect.headerIterator().hasNext()) {
                        final HttpRequest original = request.getOriginal();
                        redirect.setHeaders(original.getAllHeaders());
                    }
                    currentRequest = HttpRequestWrapper.wrap(redirect);

                    if (currentRequest instanceof HttpEntityEnclosingRequest) {
                        RequestEntityProxy.enhance((HttpEntityEnclosingRequest) currentRequest);
                    }

                    final URI uri = currentRequest.getURI();
                    final HttpHost newTarget = URIUtils.extractHost(uri);
                    if (newTarget == null) {
                        throw new ProtocolException("Redirect URI does not specify a valid host name: " +
                                uri);
                    }

                    // Reset virtual host and auth states if redirecting to another host
                    if (!currentRoute.getTargetHost().equals(newTarget)) {
                        final AuthState targetAuthState = context.getTargetAuthState();
                        if (targetAuthState != null) {
                            this.log.debug("Resetting target auth state");
                            targetAuthState.reset();
                        }
                        final AuthState proxyAuthState = context.getProxyAuthState();
                        if (proxyAuthState != null && proxyAuthState.isConnectionBased()) {
                            this.log.debug("Resetting proxy auth state");
                            proxyAuthState.reset();
                        }
                    }

                    currentRoute = this.routePlanner.determineRoute(newTarget, currentRequest, context);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Redirecting to '" + uri + "' via " + currentRoute);
                    }
                    EntityUtils.consume(response.getEntity());
                    response.close();
                } else {
                	// 如果不需要重定向就直接返回
                    return response;
                }
            } catch (final RuntimeException ex) {
                response.close();
                throw ex;
            } catch (final IOException ex) {
                response.close();
                throw ex;
            } catch (final HttpException ex) {
                try {
                	// 遇到异常时,调用 EntityUtils.consume 清理数据、关闭连接
                    EntityUtils.consume(response.getEntity());
                } catch (final IOException ioex) {
                    this.log.debug("I/O error while releasing connection", ioex);
                } finally {
                    response.close();
                }
                throw ex;
            }
        }
    }

RedirectExec 里调用了 RetryExec,一看就知道是用来做重试的:

    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        final Header[] origheaders = request.getAllHeaders();
        for (int execCount = 1;; execCount++) {
            try {
                return this.requestExecutor.execute(route, request, context, execAware);
            } catch (final IOException ex) {
                if (execAware != null && execAware.isAborted()) {
                    throw ex;
                }
                // 判断是否需要重试,这里就会调用我们自定义的 retryHandler 了
                if (retryHandler.retryRequest(ex, execCount, context)) {
                	// 判断 HTTPRequest 里的数据是否可重复利用,否则不重试
                    if (!RequestEntityProxy.isRepeatable(request)) {
                        throw new NonRepeatableRequestException("Cannot retry request " +
                                "with a non-repeatable request entity", ex);
                    }
                    request.setHeaders(origheaders);
                } else {
                    if (ex instanceof NoHttpResponseException) {
                        final NoHttpResponseException updatedex = new NoHttpResponseException(
                                route.getTargetHost().toHostString() + " failed to respond");
                        updatedex.setStackTrace(ex.getStackTrace());
                        throw updatedex;
                    } else {
                        throw ex;
                    }
                }
            }
        }
    }

RetryExec 里又调用了 ProtocolExec,就快要执行 HTTP 调用了,执行前后要调用之前 HTTPProcessor 进行处理。

    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException,
        HttpException {

        final HttpRequest original = request.getOriginal();
        URI uri = null;
        // 省略 URI 获取
        request.setURI(uri);

        // Re-write request URI if needed
        rewriteRequestURI(request, route);

        final HttpParams params = request.getParams();
        HttpHost virtualHost = (HttpHost) params.getParameter(ClientPNames.VIRTUAL_HOST);
        // HTTPCLIENT-1092 - add the port if necessary
        if (virtualHost != null && virtualHost.getPort() == -1) {
            final int port = route.getTargetHost().getPort();
            if (port != -1) {
                virtualHost = new HttpHost(virtualHost.getHostName(), port,
                    virtualHost.getSchemeName());
            }
        }

        HttpHost target = null;
        if (virtualHost != null) {
            target = virtualHost;
        } else {
            if (uri != null && uri.isAbsolute() && uri.getHost() != null) {
                target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
            }
        }

        // Get user info from the URI
        if (uri != null) {
            final String userinfo = uri.getUserInfo();
            if (userinfo != null) {
                CredentialsProvider credsProvider = context.getCredentialsProvider();
                if (credsProvider == null) {
                    credsProvider = new BasicCredentialsProvider();
                    context.setCredentialsProvider(credsProvider);
                }
                credsProvider.setCredentials(
                        new AuthScope(target),
                        new UsernamePasswordCredentials(userinfo));
            }
        }

        // Run request protocol interceptors
        context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
        context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
        context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);

        // 调用 HttpRequestIntreceptor,设置 header、cookie、user-agent 等
        this.httpProcessor.process(request, context);

        final CloseableHttpResponse response = this.requestExecutor.execute(route, request,
            context, execAware);
        try {
            // Run response protocol interceptors
            context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
            // 执行 HttpResponseInterceptor,默认会执行 ResponseContentEncoding 方法对压缩过的数据进行解压
            this.httpProcessor.process(response, context);
            return response;
        }  // 省略一些异常处理
    }

ProtocolExec 又调用了 MainClientExec,这一步从连接池里获取了连接,调用 requestExecutor 执行,然后判断连接复用策略。

    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        AuthState targetAuthState = context.getTargetAuthState();
        if (targetAuthState == null) {
            targetAuthState = new AuthState();
            context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
        }
        AuthState proxyAuthState = context.getProxyAuthState();
        if (proxyAuthState == null) {
            proxyAuthState = new AuthState();
            context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
        }
        if (request instanceof HttpEntityEnclosingRequest) {
            RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
        }

        Object userToken = context.getUserToken();

        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            } else {
                execAware.setCancellable(connRequest);
            }
        }

        final RequestConfig config = context.getRequestConfig();

        final HttpClientConnection managedConn;
        try {
        	// 从连接池里拿一个连接,只等待指定的超时时间
            final int timeout = config.getConnectionRequestTimeout();
            managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final InterruptedException interrupted) {
            Thread.currentThread().interrupt();
            throw new RequestAbortedException("Request aborted", interrupted);
        } catch(final ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause == null) {
                cause = ex;
            }
            throw new RequestAbortedException("Request execution failed", cause);
        }

        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
        // 判断是否需要检查连接有效性,4.5.x 版本默认不检查,4.2.x 版本默认检查,该检查对性能影响较大
        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection
            if (managedConn.isOpen()) {
                if (managedConn.isStale()) {
                    managedConn.close();
                }
            }
        }

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            if (execAware != null) {
                execAware.setCancellable(connHolder);
            }

            HttpResponse response;
            for (int execCount = 1;; execCount++) {
                if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                    throw new NonRepeatableRequestException("Cannot retry request " +
                            "with a non-repeatable request entity.");
                }
                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                if (!managedConn.isOpen()) {
                    try {
                    	// 如果连接已经关闭,就重新建立连接
                        establishRoute(proxyAuthState, managedConn, route, request, context);
                    } catch (final TunnelRefusedException ex) {
                        response = ex.getResponse();
                        break;
                    }
                }
                // 设置连接超时时间
                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    managedConn.setSocketTimeout(timeout);
                }

                // 执行请求
                response = requestExecutor.execute(request, managedConn, context);

                // 判断是否要重用连接
                if (reuseStrategy.keepAlive(response, context)) {
                	// 计算连接有效时间,有些服务端会在响应 response 里指定
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    connHolder.markReusable();
                } else {
                    connHolder.markNonReusable();
                }
            }

            if (userToken == null) {
                userToken = userTokenHandler.getUserToken(context);
                context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
            }
            if (userToken != null) {
                connHolder.setState(userToken);
            }

            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            } else {
                return new HttpResponseProxy(response, connHolder);
            }
        } catch (final ConnectionShutdownException ex) {
            final InterruptedIOException ioex = new InterruptedIOException(
                    "Connection has been shut down");
            ioex.initCause(ex);
            throw ioex;
        } // 省略异常处理
    }

MainClientExec 里又调用了 HttpRequestExecutor,这里才真正的开始发送 HTTP 请求,接收 HTTP 响应。

    public HttpResponse execute(HttpRequest request, HttpClientConnection conn, HttpContext context) throws IOException, HttpException {
        try {
        	// 发送数据
            HttpResponse response = this.doSendRequest(request, conn, context);
            if (response == null) {
            	// 接收响应
                response = this.doReceiveResponse(request, conn, context);
            }
            return response;
        } // 省略异常处理
    }
    protected HttpResponse doSendRequest(HttpRequest request, HttpClientConnection conn, HttpContext context) throws IOException, HttpException {
        HttpResponse response = null;
        context.setAttribute("http.connection", conn);
        context.setAttribute("http.request_sent", Boolean.FALSE);
        // 写 HTTP header,最终是调用 SessionOutputBufferImpl -> SocketOutputStream
        conn.sendRequestHeader(request);
        if (request instanceof HttpEntityEnclosingRequest) {
            boolean sendentity = true;
            ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
            if (((HttpEntityEnclosingRequest)request).expectContinue() && !ver.lessEquals(HttpVersion.HTTP_1_0)) {
                conn.flush();
                if (conn.isResponseAvailable(this.waitForContinue)) {
                    response = conn.receiveResponseHeader();
                    if (this.canResponseHaveBody(request, response)) {
                        conn.receiveResponseEntity(response);
                    }

                    int status = response.getStatusLine().getStatusCode();
                    if (status < 200) {
                        if (status != 100) {
                            throw new ProtocolException("Unexpected response: " + response.getStatusLine());
                        }

                        response = null;
                    } else {
                        sendentity = false;
                    }
                }
            }

            if (sendentity) {
            	// 写 http body
                conn.sendRequestEntity((HttpEntityEnclosingRequest)request);
            }
        }

        conn.flush();
        context.setAttribute("http.request_sent", Boolean.TRUE);
        return response;
    }
    protected HttpResponse doReceiveResponse(HttpRequest request, HttpClientConnection conn, HttpContext context) throws HttpException, IOException {
        HttpResponse response = null;
        int statusCode = 0;
        while(response == null || statusCode < 200) {
        	// 读取 response header,最终调用了 SessionInputBufferImpl -> SocketInputStream
            response = conn.receiveResponseHeader();
            // 读取状态码
            statusCode = response.getStatusLine().getStatusCode();
            if (statusCode < 100) {
                throw new ProtocolException("Invalid response: " + response.getStatusLine());
            }
            // 读取 response body
            if (this.canResponseHaveBody(request, response)) {
                conn.receiveResponseEntity(response);
            }
        }

        return response;
    }

转载:https://blog.csdn.net/hustspy1990/article/details/101038687
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场