tomcat 调优

查看tomcat 8080端口的tcp连接数

netstat -an| grep 8080 |awk ‘/^tcp/{++S[$NF]}END{for (a in S)print a,S[a]}’

可以查看连接数量,如下图:
tomcat-tcp

现在的业务场景是多客户端,短连接,每个请求的处理时间相对较短,需要较多的线程来处理。相应的需要调优tomcat的并发参数,centos系统的参数调整暂不讨论
tomcat的并发参数通过调整server.xml里面的连接和线程池来获取更大的吞吐能力,一般是调整executor,connector组件
下面的参数都是我自己的理解,可能会有出入,请参考官方英文文档

https://tomcat.apache.org/tomcat-8.5-doc/config/service.html

Executor component

参数名称 类型 说明
className string 继承类org.apache.catalina.Executor接口的线程池额执行类,可以自己实现
name string 关联server.xml 其他的定义好的executor
threadPriority int executor的线程优先级,默认值是5(Thread.NORM_PRIORITY)
daemon booean 是否开启守护线程
namePrefix string 定义线程的头部名称 最后线程名称是namePrefix + threadNumber
maxThreads int 最大线程数,默认值200
minSpareThreads int 核心线程数的数量,默认值是25
maxIdleTime int 空闲线程的关闭事件,单位是ms,如果是活跃线程数量小于或者等于核心线程数不会关闭空闲线程,默认值60000ms
maxQueueSize int 队列最大数值,默认值是int的最大值,当队列满时,提供拒绝策略
prestartminSpareThreads boolean 是否在启动executor时,启动核心线程数量,默认值是false
threadRenewalDelay long 如果ThreadLocalLeakPreventionListener有配置,在恢复线程池里面线程时,未了避免线程同时创建卡住的情况,两个线程之间的间隔时间,默认值是1000ms,设置为负值时,线程不会被重建

Connector HTTP/1.1 component

参数名称 类型 说明
allowTrace boolean 是否支持http trace方法
asyncTimeout int 异步请求的的超时时间,默认是30000ms
defaultSSLHostConfigName string 默认SSLHostConifg的名称
enableLookups boolean 设置为true返回远程机器的hostName,设置为false返回ip地址
maxHeaderCount int default is 100,允许的最大的header里面条目的条数
maxParameterCount int 默认值是10000,get参数+post参数的最大参数
maxPostSize int 默认值是2097152byte(2M),设置post的最大字节数
maxSavePostSize int 在FORM or CLIENT-CERT authentication认证的时候,post的大小,默认是4096byte(4kb)
parseBodyMethods string array 解析哪些请求method到POST方式,默认是POST,可以设置PUT method – 应该是兼容PUT模式时使用
port int 端口号
protocol 协议 默认值是HTTP/1.1 限制支持三种 Http11NioProtocol,Http11Nio2Protocol,Http11AprProtocol
proxyName string 代理名称
proxyPort int 代理端口号
redirectPort int 如果是https 请求8080端口时,强制跳转https端口
scheme string 默认值是http
secure boolean 标记请求是安全的,调用request.isSecure 返回true
sendReasonPhrase boolean 为true时,会在返回中加上原因
URIEncoding boolean true:ISO-8859-1 false:UTF-8
useBodyEncodingForURI boolean false:ISO-8859-1 true:UTF-8
useIPVHosts boolean 为true使用ip address作为绑定
xpoweredBy boolean 使用头部建议,例如 x-power-by:openresty/1.13.6.1
acceptCount int 如果所有服务于连接的线程都已经被使用了,连接无可用线程时,会存放到这个队列里面,默认值是100
acceptorThreadCount int 默认值是1,如果是多核CPU需要增加这个值,如果有很多非长连接时,通常需要增加这个值

– 未完待续

Fork And Join Task

之前执行异步任务,等待异步结果使用的是Executors.newFixedThreadPool初始化线程连接池。将业务逻辑封装到Callable接口中,使用Future.get()阻塞
主线程来等待所有线程执行完毕。jdk1.7之后已经可以使用另一种方式来优雅的处理这种情况。

ForkJoinPool

当一个任务 需要拆分成2个及2个以上时,可以通过使用ForkJoinPool的execute方法执行ForkJoinTask执行,同样也支持子任务拆分子任务,通fork一样无线fork下去,最后所有子任务向上join到开始任务上。

ForkJoinPool中pool的大小默认采用cpu的核数,相应的可以根据业务量来调整,建议使用cpu的核心数,这样可以充分利用多核性能。

参照并发编程网 http://ifeve.com/fork-join-2/

实现了一个简单任务拆分的例子

    package cn.db.forkJoin;

    /**
     * @author dongbin.yu
     * @from 2018-09-15
     * @since V1.0
     */
    public class Product {

        private String name;

        private double price;

        public Product(String name, double price) {
            this.name = name;
            this.price = price;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }


商品类

    package cn.db.forkJoin;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;

    /**
     * @author dongbin.yu
     * @from 2018-09-15
     * @since V1.0
     */
    public class ProductTask extends RecursiveAction {

        private int first;

        private int last;

        private List<Product> productList;

        public ProductTask(int first, int last, List<Product> productList) {
            this.first = first;
            this.last = last;
            this.productList = productList;
        }

        /**
         * 编写业务逻辑
         */
        @Override
        protected void compute() {

            if ((last - first) > 10) {
                int middle = (last + first) / 2;
                System.out.println("thread queue size: " + getQueuedTaskCount());
                invokeAll(new ProductTask(first, middle + 1, productList),new ProductTask(middle + 1, last, productList));
            } else {
                updatePrice();
            }

        }

        private void updatePrice() {
            for (int i = first; i < last; i++) {
                Product product = productList.get(i);
                product.setPrice(2.5 * i);
            }
        }

        public static void main(String[] args) throws InterruptedException {

            long start = System.currentTimeMillis();
            ForkJoinPool pool = new ForkJoinPool();
            List<Product> productList = new ArrayList<>();
            for (int i = 0; i < 2000; i++) {
                productList.add(new Product("product" + i, 1.0));
            }
            ProductTask productTask = new ProductTask(0, 2000, productList);
            pool.execute(productTask);

            while (!productTask.isDone()) {
                System.out.println("active count " + pool.getActiveThreadCount());
                System.out.println("steal count " + pool.getStealCount());
                System.out.println("parallelism count " + pool.getParallelism());

                Thread.sleep(2);
            }

            pool.shutdown();
            System.out.println("cost time: " + (System.currentTimeMillis() - start));

        }

    }


Task类,如果不需要汇总结果可以继承RecursiveAction类,需要汇总结果时请继承RecursiveTask类,compute方法中写入自己的业务逻辑代码,相应的也可以根据业务拆分子任务。

并发工具类

Semaphore

是一个分离的信号量计数器

主要核心方法有两个:

  • acquire

从信号队列里面获取一个信号量,如果没有获取到则会一直阻塞到获取到信号量为止

  • release

释放一个信号量回队列,这种模型相对来说比较适合池这种类型的业务需求

acquire(int permits) 一次获取多个信号量

acquireUninterruptibly() 连续获取一个信号量

acquireUninterruptibly(int permits) 连续获得多个信号量

适合连接池的数量控制,维护一个池的总量

CountDownLatch

  • await() throws InterruptedException 调用该方法的线程等到构造方法传入的N减到0的时候,才能继续往下执行

  • await(long timeout, TimeUnit unit) 超时设置

  • countDown() 计数器减1

  • getCount()

适合等待一堆线程完成任务后,在执行主线程任务。

CyclicBarrier

  • await() 使异步线程等待

  • await(long timeout, TimeUnit unit) 超时设置

  • getNumberWaiting() 返回哪些异步线程已经处于等待状态

  • isBroken() 查询等待线程是否已经被中断

  • reset() 如果所有线程都处于等待状态,执行完后续的操作后,可以重置 继续使用

除了默认构造方法,还可以传入一个Runnable任务,执行

public CyclicBarrier(int parties, Runnable barrierAction)

可以执行一些回调方法

上面countDownLatch的功能也可以用这个来实现,在异步线程的最后面调用await方法,可以做到让异步线程同时开始一件事情