数据库线程池,啥时候会使用线程池
大家好,今天给各位分享数据库线程池的一些知识,其中也会对啥时候会使用线程池进行解释,文章篇幅可能偏长,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在就马上开始吧!
你真的知道如何设置数据库连接池的大小吗
前段时间在一个老项目中经历过一个问题:一个 Dubbo服务,启动的时候慢的要死,后来看日志查原因整个过程一直在初始化数据库连接。一看数据库连接参数,连接池大小:1024。
很多入行晚的同学没有经历过手写 JDBC连接的日子。那个时候没有数据库连接池的概念,都是原生代码一顿搞,后来有了 iBATIS之后 Java开发的繁杂程度才逐渐减轻,也衍生 C3P0数据库连接池这种基础的东西。罗马不是一天建成的,可是互联网发展太快了,技术压力逼迫下各种中间件被迫研发,大家加班加点搞出来各种高大上的脚手架,也成就很多伟人。
数据库连接使用 TCP的方式,建立连接需要3次握手,释放连接需要4次挥手,当今这种互联网使用频率下,如果每一次访问数据库都重新建立连接,我估计你们公司倒闭800次都不够。
1.数据库连接的过程是怎样的
Java鼻祖 Sun公司是想以一套API统一天下,奈何各个数据库服务器厂商太给力统一不了。无奈之举是创建了一个统一的接口,提出一套统一接入的步骤,各个厂商实现接口,按照步骤加载自己的数据库。所以现在的方案就是4板斧:
注册驱动,为人所知的:Class.forName();
获取Connection,成功即与数据库建立连接;
拿到Statement对象,用于操作数据库的CRUD;
获取数据库返回结果ResultSet。
大家应该都知道数据库本身是一个客户端程序,只有启动了才能连接。拿 MYSQL举例,我们在安装并启动了服务的机器上,命令行的方式输入:mysql-uroot-p即可连接当前数据库。
MYSQL连接方式有很多种,区分Unix系统和 Windows系统以及通用的连接方式,在这里仅说两种方式:一种为 unix domain socket,另外一种为基于 tcp/ip协议,一般我们如果远程访问数据库肯定是基于 tcp/ip的,但是如果我们在本机登录就会分为使用 socket还是 tcp/ip。
socket:mysql-uroot-p
tcp/ip:mysql-h127.0.0.1-uroot-p
当数据库服务器和应用服务器位于不同的主机时就要使用 tcp/ip的方式建立连接。每一个连接在操作系统中占用一个线程来维护。建立连接也分为两类:短连接和长连接。
短连接
所谓短连接就是指应用程序和数据库通信完毕之后连接关闭。这种连接每次的操作就是:
发出请求--->建立连接--->操作数据--->释放连接
这样做的问题是:
频繁的建立/释放连接对数据库来说增加了系统负担;
应用程序每次操作数据库的过程将会变得很慢;
应用系统每次建立连接都要占用一个端口,频繁的建立/释放,每个被释放的连接在发出释放请求之后并不是马上就执行,必须经历一个 FIN阶段的等待直到确认为止。所以在每秒几千次数据库请求的时候,应用服务器端口很有可能被消耗完。
长连接
长连接即在建立连接后一直打开,直到应用程序关闭才释放。使用长连接的好处是减少每次创建连接带来的开销。
对于应用服务器来说维持长连接的好处不言自明,但是对于数据库服务器来说,过多的长连接则是灾难。
MYSQL的TCP连接支持长连接,所以每次操作完数据库,可以不必直接关掉连接,而是等待下次使用的时候在复用这个连接。所有的Socket长连接都是通过TCP自带的ping来维持心跳(TCP保活),从而保持连接状态,而我们熟悉的websocket,也正是通过TCP的心跳来维持连接不被中断。
连接池
长连接的好处这么大,自然大家都用长连接。慢慢就搞出一套长连接维护的工具-数据库连接池。
设计连接池也没有多么复杂,大致的步骤就是:
初始化连接;
业务取出连接;
业务发送请求;
放回连接。
除了上面的基本功能以外,还要处理并发问题,多数据库服务器和多用户,事务处理,连接池的配置与维护。大概就这些功能。有了连接池之后,连接的建立和释放跟业务就没有关系,交给交接池来维护。
2. MYSQL能支持多少连接
MYSQL的最大连接数在5.7版本中默认是151,最大可以达到16384(2^14)。如何设置最大连接数在于你的服务器性能,查看 MYSQL连接数信息命令如下:
mysql> show variables like'%max_connections%';
+-----------------+-------+
| Variable_name| Value|
+-----------------+-------+
| max_connections| 5050|
+-----------------+-------+
1 row in set(0.00 sec)
我们生产环境MYSQL的最大连接数设置为 5050,注意不能设置的太小,太小造成的后果是连接失败:“query failed Error 1040: Too many connections“错误。太大且当连接该数据库的机器比较多的时候则会对当前MYSQL的性能产生影响。
MYSQL官网给出了一个设置最大连接数的建议比例:
Max_used_connections/ max_connections* 100%≈ 85%
即已使用的连接数占总上限的85%左右,如果目前已使用的连接数与最大连接数比例小于10%那很显然设置的过大。
查询当前数据库已建立连接数:
mysql> show status like'Threads_connected';
+-------------------+-------+
| Variable_name| Value|
+-------------------+-------+
| Threads_connected| 89|
+-------------------+-------+
1 row in set(0.00 sec)
Mysql的配置可以在全局变量中查询和设置,相关的配置主要可以查询下面这些:
配置
含义
Connections
尝试连接Mysql的连接数,不管连接成功与否,该值都会+1
Threads_connected
已经建立的连接数,单节点下一般小于最大连接池最大连接数
max_connections
Mysql限制的最大的可连接的数量
wait_timeout
即MYSQL长连接(非交互式)的最大生命时长,默认是8小时
interactive_timeout
长连接(交互式)的最大生命时长,默认是8小时
3.连接池设置多少连接才合适
设置连接池的大小肯定不是越大越好,需要考虑的是当前服务所在机器的性能,网络状况,数据库机器性能,数据库特性等等。同时也要做到不浪费系统资源,内存,端口,同步信号量等等。
比如说应用服务器Tomcat设置的最大线程池缺省值200,最大假设每个线程会用到一个数据库连接,那么线程池大小应该小于等于200。
另外需要考虑的是,每申请一个长连接都会在物理网络上建立一个用于长连接维护的进程,而进程的执行跟物理机的CPU核数有关。理论上一个8核的服务器将连接池设置为8最佳,每一个核同时处理一个线程,超过8的并发就有线程上下文切换的开销。
这里有一个 Oracle性能小组发布的简短视频,连接池测试分2个部分:测试视频1,测试视频2。视频中调整了线程池大小为2048的时候数据库性能陡然下降,后面调整到144就恢复了。PostgreSQL提供了一个设置预期线程池大小的公式:
connections=((core_count* 2)+ effective_spindle_count)
该公式来自于:https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing。
其中,core_count是CPU核心, effective_spindle_count的含义是有效主轴数,如果你的服务器使用的是带有16个磁盘的RAID,那么valid_spindle_count=16。它实质上是服务器可以管理多少个并行I/ O请求的度量。旋转硬盘一次(通常)一次只能处理一个I/ O请求,如果你有16个,则系统可以同时处理16个I/ O请求。
我想 Hikari作为目前最优秀的数据库连接池之一,提出的这个公式还是经得起检验的。大家不妨在生产环境试试(出问题别找我)。
你真的知道如何设置数据库连接池的大小吗
标签:加班The带来脚手架过多功能短连接建立连接复杂
啥时候会使用线程池
编者注:Java中的线程池是运用场景最多的并发组件,几乎所有需要异步或并发执行任务的程序都可以使用线程池。
在开发过程中,合理地使用线程池能够带来至少以下几个好处。
降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须了解其实现原理。
代码解耦:比如生产者消费者模式。
线程池实现原理
当提交一个新任务到线程池时,线程池的处理流程如下:
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤也需要获取全局锁)。
如果创建新线程将使当前运行的线程数超出maximumPoolSize,该任务将被拒绝,并调用相应的拒绝策略来处理(RejectedExecutionHandler.rejectedExecution()方法,线程池默认的饱和策略是AbortPolicy,也就是抛异常)。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
线程池任务拒绝策略包括抛异常、直接丢弃、丢弃队列中最老的任务、将任务分发给调用线程处理。
线程池的创建:通过ThreadPoolExecutor来创建一个线程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnableTaskQueue, handler);
创建一个线程池时需要输入以下几个参数:
corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到线程池的线程数等于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无界阻塞队列。
线程的状态
在HotSpot VM线程模型中,Java线程被一对一映射到本地系统线程,Java线程启动时会创建一个本地系统线程;当Java线程终止时,这个本地系统线程也会被回收。操作系统调度所有线程并把它们分配给可用的CPU。
thread运行周期中,有以下6种状态,在 java.lang.Thread.State中有详细定义和说明:
// Thread类
public enum State{
/**
*刚创建尚未运行
*/
NEW,
/**
*可运行状态,该状态表示正在JVM中处于运行状态,不过有可能是在等待其他资源,比如CPU时间片,IO等待
*/
RUNNABLE,
/**
*阻塞状态表示等待monitor锁(阻塞在等待monitor锁或者在调用Object.wait方法后重新进入synchronized块时阻塞)
*/
BLOCKED,
/**
*等待状态,发生在调用Object.wait、Thread.join(with no timeout)、LockSupport.park
*表示当前线程在等待另一个线程执行某种动作,比如Object.notify()、Object.notifyAll(),Thread.join表示等待线程执行完成
*/
WAITING,
/**
*超时等待,发生在调用Thread.sleep、Object.wait、Thread.join(in timeout)、LockSupport.parkNanos、LockSupport.parkUntil
*/
TIMED_WAITING,
/**
*线程已执行完成,终止状态
*/
TERMINATED;
}
线程池操作
向线程池提交任务,可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute()方法输入的任务是一个Runnable类的实例。
threadsPool.execute(new Runnable(){
@Override
public void run(){
// TODO Auto-generated method stub
}
});
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,通过future的get()方法来获取返回值,future的get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务还没有执行完。
Future<Object> future= executor.submit(harReturnValuetask);
try{
Object s= future.get();
} catch(InterruptedException e){
//处理中断异常
} catch(ExecutionException e){
//处理无法执行任务异常
} finally{
//关闭线程池
executor.shutdown();
}
合理配置线程池
要想合理配置线程池,必须先分析任务的特点,可以从以下几个角度分析:
任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能少的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置多一点线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
线程池中线程数量未达到coreSize时,这些线程处于什么状态?
这些线程处于RUNNING或者WAITING,RUNNING表示线程处于运行当中,WAITING表示线程阻塞等待在阻塞队列上。当一个task submit给线程池时,如果当前线程池线程数量还未达到coreSize时,会创建线程执行task,否则将任务提交给阻塞队列,然后触发线程执行。(从submit内部调用的代码也可以看出来)
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。
ScheduledThreadPoolExecutor的功能与Timer类似,但
ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而
ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,
ScheduledThreadPoolExecutor和ThreadPoolExecutor的区别是,ThreadPoolExecutor获取任务时是从BlockingQueue中获取的,而
ScheduledThreadPoolExecutor是从DelayedWorkQueue中获取的(注意,DelayedWorkQueue是BlockingQueue的实现类)。
ScheduledThreadPoolExecutor把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中,其中ScheduledFutureTask主要包含3个成员变量:
sequenceNumber:任务被添加到ScheduledThreadPoolExecutor中的序号;
time:任务将要被执行的具体时间;
period:任务执行的间隔周期。
ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序比较算法实现如下:
ScheduledFutureTask在DelayQueue中被保存在一个PriorityQueue(基于数组实现的优先队列,类似于堆排序中的优先队列)中,在往数组中添加/移除元素时,会调用siftDown/siftUp来进行元素的重排序,保证元素的优先级顺序。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable>{
private static final int INITIAL_CAPACITY= 16;
private RunnableScheduledFuture<?>[] queue=
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock= new ReentrantLock();
private int size= 0;
private Thread leader= null;
private final Condition available= lock.newCondition();
}
从DelayQueue获取任务的主要逻辑就在take()方法中,首先获取lock,然后获取queue[0],如果为null则await等待任务的来临,如果非null查看任务是否到期,是的话就执行该任务,否则再次await等待。这里有一个leader变量,用来表示当前进行awaitNanos等待的线程,如果leader非null,表示已经有其他线程在进行awaitNanos等待,自己await等待,否则自己进行awaitNanos等待。
// DelayedWorkQueue
public RunnableScheduledFuture<?> take() throws InterruptedException{
final ReentrantLock lock= this.lock;
lock.lockInterruptibly();
try{
for(;;){
RunnableScheduledFuture<?> first= queue[0];
if(first== null)
available.await();
else{
long delay= first.getDelay(NANOSECONDS);
if(delay<= 0)
return finishPoll(first);
first= null;// don't retain ref while waiting
if(leader!= null)
available.await();
else{
Thread thisThread= Thread.currentThread();
leader= thisThread;
try{
available.awaitNanos(delay);
} finally{
if(leader== thisThread)
leader= null;
}
}
}
}
} finally{
if(leader== null&& queue[0]!= null)
available.signal();
lock.unlock();
}
}
获取到任务之后,就会执行task的run()方法了,即ScheduledFutureTask.run():
public void run(){
boolean periodic= isPeriodic();
if(!canRunInCurrentRunState(periodic))
cancel(false);
else if(!periodic)
ScheduledFutureTask.super.run();
else if(ScheduledFutureTask.super.runAndReset()){
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
推荐阅读:
JMM Java内存模型
happens-before那些事儿
为什么说LockSupport是Java并发的基石?
责任链的2种实现方式,你更pick哪一种
2阅读
请问mysql数据库是不能多线程写入吗
在MySQL 8.0之前,我们假设一下有一条烂SQL,
mysqlselect* from t1 order by rand();
以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?
大概有以下几种解决办法:
设置max_execution_time来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL掉。有些必须要执行很长时间的也会被误杀。
自己写个脚本检测这类语句,比如order by rand(),超过一定时间用Kill query thread_id给杀掉。
那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?
那mysql 8.0引入的资源组(resource group,后面简写微RG)可以基本上解决这类问题。
比如我可以用 RG来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他继续运行,如果有新的此类语句,让他排队好了。
为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。
那我来演示下如何使用RG。
创建一个资源组user_ytt.这里解释下各个参数的含义,
type= user表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。
vcpu代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。
thread_priority设置优先级。user级优先级设置大于0。
mysqlmysql> create resource group user_ytt type= user vcpu= 0-1 thread_priority=19 enable;Query OK, 0 rows affected(0.03 sec)
RG相关信息可以从 information_schema.resource_groups系统表里检索。
mysqlmysql> select* from information_schema.resource_groups;+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME| RESOURCE_GROUP_TYPE| RESOURCE_GROUP_ENABLED| VCPU_IDS| THREAD_PRIORITY|+---------------------+---------------------+------------------------+----------+-----------------+| USR_default| USER| 1| 0-3| 0|| SYS_default| SYSTEM| 1| 0-3| 0|| user_ytt| USER| 1| 0-1| 19|+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set(0.00 sec)
我们来给语句select guid from t1 group by left(guid,8) order by rand()赋予RG user_ytt。
mysql> show processlist;+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id| User| Host| db| Command| Time| State| Info|+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| 4| event_scheduler| localhost| NULL| Daemon| 10179| Waiting on empty queue| NULL|| 240| root| localhost| ytt| Query| 101| Creating sort index| select guid from t1 group by left(guid,8) order by rand()|| 245| root| localhost| ytt| Query| 0| starting| show processlist|+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set(0.00 sec)
找到连接240对应的thread_id。
mysqlmysql> select thread_id from performance_schema.threads where processlist_id= 240;+-----------+| thread_id|+-----------+| 278|+-----------+1 row in set(0.00 sec)
给这个线程278赋予RG user_ytt。没报错就算成功了。
mysqlmysql> set resource group user_ytt for 278;Query OK, 0 rows affected(0.00 sec)
当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT来单独给这个语句赋予RG。比如:
mysqlmysql> select/*+ resource_group(user_ytt)*/guid from t1 group by left(guid,8) order by rand()....8388602 rows in set(4 min 46.09 sec)
RG的限制:
Linux平台上需要开启 CAPSYSNICE特性。比如我机器上用systemd给mysql服务加上
systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE
mysql线程池开启后RG失效。
freebsd,solaris平台thread_priority失效。
目前只能绑定CPU,不能绑定其他资源。
END,本文到此结束,如果可以帮助到大家,还望关注本站哦!