首页数据库多线程操作数据库(要疯了,怎样用多线程向MYSQL数据库中写入数据)

多线程操作数据库(要疯了,怎样用多线程向MYSQL数据库中写入数据)

编程之家2023-10-21116次浏览

大家好,今天来为大家分享多线程操作数据库的一些知识点,和要疯了,怎样用多线程向MYSQL数据库中写入数据的问题解析,大家要是都明白,那么可以忽略,如果不太清楚的话可以看看本篇文章,相信很大概率可以解决您的问题,接下来我们就一起来看看吧!

多线程操作数据库(要疯了,怎样用多线程向MYSQL数据库中写入数据)

java怎么多线程操作一个数据库

//将数据库中的数据条数分段

public void division(){

//获取要导入的总的数据条数

String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]";

try{

pss=cons.prepareStatement(sql3);

多线程操作数据库(要疯了,怎样用多线程向MYSQL数据库中写入数据)

rss=pss.executeQuery();

while(rss.next()){

System.out.println("总记录条数:"+rss.getInt(1));

sum=rss.getInt(1);

}

多线程操作数据库(要疯了,怎样用多线程向MYSQL数据库中写入数据)

//每30000条记录作为一个分割点

if(sum>=30000){

n=sum/30000;

residue=sum%30000;

}else{

residue=sum;

}

System.out.println(n+""+residue);

} catch(SQLException e){

// TODO Auto-generated catch block

e.printStackTrace();

}

}

线程类

public MyThread(int start,int end){

this.end=end;

this.start=start;

System.out.println("处理掉余数");

try{

System.out.println("--------"+Thread.currentThread().getName()+"------------");

Class.forName(SQLSERVERDRIVER);

System.out.println("加载sqlserver驱动...");

cons= DriverManager.getConnection(CONTENTS,UNS,UPS);

stas= cons.createStatement();

System.out.println("连接SQLServer数据库成功!!");

System.out.println("加载mysql驱动.....");

Class.forName(MYSQLDRIVER);

con= DriverManager.getConnection(CONTENT,UN,UP);

sta= con.createStatement();

//关闭事务自动提交

con.setAutoCommit(false);

System.out.println("连接mysql数据库成功!!");

} catch(Exception e){

e.printStackTrace();

}

// TODO Auto-generated constructor stub

}

public ArrayList<Member> getAll(){

Member member;

String sql1="select* from(select row_number() over(order by pmcode) as rowNum,*"+

" from [CMD].[dbo].[mycopy1]) as t where rowNum between"+start+" and"+end;

try{

System.out.println("正在获取数据...");

allmembers=new ArrayList();

rss=stas.executeQuery(sql1);

while(rss.next()){

member=new Member();

member.setAddress1(rss.getString("address1"));

member.setBnpoints(rss.getString("bnpoints"));

member.setDbno(rss.getString("dbno"));

member.setExpiry(rss.getString("expiry"));

member.setHispoints(rss.getString("hispoints"));

member.setKypoints(rss.getString("kypoints"));

member.setLevels(rss.getString("levels"));

member.setNames(rss.getString("names"));

member.setPmcode(rss.getString("pmcode"));

member.setRemark(rss.getString("remark"));

member.setSex(rss.getString("sex"));

member.setTelephone(rss.getString("telephone"));

member.setWxno(rss.getString("wxno"));

member.setPmdate(rss.getString("pmdate"));

allmembers.add(member);

// System.out.println(member.getNames());

}

System.out.println("成功获取sqlserver数据库数据!");

return allmembers;

} catch(SQLException e){

// TODO Auto-generated catch block

System.out.println("获取sqlserver数据库数据发送异常!");

e.printStackTrace();

}

try{

rss.close();

stas.close();

} catch(SQLException e){

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

public void inputAll(ArrayList<Member> allmembers){

System.out.println("开始向mysql中写入");

String sql2="insert into test.mycopy2 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

try{

ps=con.prepareStatement(sql2);

System.out.println("-------------------------等待写入数据条数:"+allmembers.size());

for(int i=0;i<allmembers.size();i++){

ps.setString(1, allmembers.get(i).getPmcode());

ps.setString(2, allmembers.get(i).getNames());

//System.out.println(allmembers.get(i).getNames());

ps.setString(3, allmembers.get(i).getSex());

ps.setString(4, allmembers.get(i).getTelephone());

ps.setString(5, allmembers.get(i).getAddress1());

ps.setString(6, allmembers.get(i).getPmdate());

ps.setString(7, allmembers.get(i).getExpiry());

ps.setString(8, allmembers.get(i).getLevels());

ps.setString(9, allmembers.get(i).getDbno());

ps.setString(10, allmembers.get(i).getHispoints());

ps.setString(11, allmembers.get(i).getBnpoints());

ps.setString(12, allmembers.get(i).getKypoints());

ps.setString(13, allmembers.get(i).getWxno());

ps.setString(14, allmembers.get(i).getRemark());

//插入命令列表

//ps.addBatch();

ps.executeUpdate();

}

//ps.executeBatch();

con.commit();

ps.close();

con.close();

this.flag=false;

System.out.println(Thread.currentThread().getName()+"--->OK");

} catch(SQLException e){

// TODO Auto-generated catch block

System.out.println("向mysql中更新数据时发生异常!");

e.printStackTrace();

}

}

@Override

public void run(){

// TODO Auto-generated method stub

while(true&&flag){

this.inputAll(getAll());

}

}

java多线程更新数据库批量的数据信息吗怎么实现

//将数据库中的数据条数分段

public void division(){

//获取要导入的总的数据条数

String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]";

try{

pss=cons.prepareStatement(sql3);

rss=pss.executeQuery();

while(rss.next()){

System.out.println("总记录条数:"+rss.getInt(1));

sum=rss.getInt(1);

}

//每30000条记录作为一个分割点

if(sum>=30000){

n=sum/30000;

residue=sum%30000;

}else{

residue=sum;

}

System.out.println(n+""+residue);

} catch(SQLException e){

// TODO Auto-generated catch block

e.printStackTrace();

}

}

线程类

public MyThread(int start,int end){

this.end=end;

this.start=start;

System.out.println("处理掉余数");

try{

System.out.println("--------"+Thread.currentThread().getName()+"------------");

Class.forName(SQLSERVERDRIVER);

System.out.println("加载sqlserver驱动...");

cons= DriverManager.getConnection(CONTENTS,UNS,UPS);

stas= cons.createStatement();

System.out.println("连接SQLServer数据库成功!!");

System.out.println("加载mysql驱动.....");

Class.forName(MYSQLDRIVER);

con= DriverManager.getConnection(CONTENT,UN,UP);

sta= con.createStatement();

//关闭事务自动提交

con.setAutoCommit(false);

System.out.println("连接mysql数据库成功!!");

} catch(Exception e){

e.printStackTrace();

}

// TODO Auto-generated constructor stub

}

public ArrayList<Member> getAll(){

Member member;

String sql1="select* from(select row_number() over(order by pmcode) as rowNum,*"+

" from [CMD].[dbo].[mycopy1]) as t where rowNum between"+start+" and"+end;

try{

System.out.println("正在获取数据...");

allmembers=new ArrayList();

rss=stas.executeQuery(sql1);

while(rss.next()){

member=new Member();

member.setAddress1(rss.getString("address1"));

member.setBnpoints(rss.getString("bnpoints"));

member.setDbno(rss.getString("dbno"));

member.setExpiry(rss.getString("expiry"));

member.setHispoints(rss.getString("hispoints"));

member.setKypoints(rss.getString("kypoints"));

member.setLevels(rss.getString("levels"));

member.setNames(rss.getString("names"));

member.setPmcode(rss.getString("pmcode"));

member.setRemark(rss.getString("remark"));

member.setSex(rss.getString("sex"));

member.setTelephone(rss.getString("telephone"));

member.setWxno(rss.getString("wxno"));

member.setPmdate(rss.getString("pmdate"));

allmembers.add(member);

// System.out.println(member.getNames());

}

System.out.println("成功获取sqlserver数据库数据!");

return allmembers;

} catch(SQLException e){

// TODO Auto-generated catch block

System.out.println("获取sqlserver数据库数据发送异常!");

e.printStackTrace();

}

try{

rss.close();

stas.close();

} catch(SQLException e){

// TODO Auto-generated catch block

e.printStackTrace();

}

return null;

}

public void inputAll(ArrayList<Member> allmembers){

System.out.println("开始向mysql中写入");

String sql2="insert into test.mycopy2 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

try{

ps=con.prepareStatement(sql2);

System.out.println("-------------------------等待写入数据条数:"+allmembers.size());

for(int i=0;i<allmembers.size();i++){

ps.setString(1, allmembers.get(i).getPmcode());

ps.setString(2, allmembers.get(i).getNames());

//System.out.println(allmembers.get(i).getNames());

ps.setString(3, allmembers.get(i).getSex());

ps.setString(4, allmembers.get(i).getTelephone());

ps.setString(5, allmembers.get(i).getAddress1());

ps.setString(6, allmembers.get(i).getPmdate());

ps.setString(7, allmembers.get(i).getExpiry());

ps.setString(8, allmembers.get(i).getLevels());

ps.setString(9, allmembers.get(i).getDbno());

ps.setString(10, allmembers.get(i).getHispoints());

ps.setString(11, allmembers.get(i).getBnpoints());

ps.setString(12, allmembers.get(i).getKypoints());

ps.setString(13, allmembers.get(i).getWxno());

ps.setString(14, allmembers.get(i).getRemark());

//插入命令列表

//ps.addBatch();

ps.executeUpdate();

}

//ps.executeBatch();

con.commit();

ps.close();

con.close();

this.flag=false;

System.out.println(Thread.currentThread().getName()+"--->OK");

} catch(SQLException e){

// TODO Auto-generated catch block

System.out.println("向mysql中更新数据时发生异常!");

e.printStackTrace();

}

}

@Override

public void run(){

// TODO Auto-generated method stub

while(true&&flag){

this.inputAll(getAll());

}

}

要疯了,怎样用多线程向MYSQL数据库中写入数据

在MySQL 8.0之前,我们假设一下有一条烂SQL,

mysqlselect* from t1 order by rand();

以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?

大概有以下几种解决办法:

设置max_execution_time来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL掉。有些必须要执行很长时间的也会被误杀。

自己写个脚本检测这类语句,比如order by rand(),超过一定时间用Kill query thread_id给杀掉。

那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?

那mysql 8.0引入的资源组(resource group,后面简写微RG)可以基本上解决这类问题。

比如我可以用 RG来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他继续运行,如果有新的此类语句,让他排队好了。

为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。

那我来演示下如何使用RG。

创建一个资源组user_ytt.这里解释下各个参数的含义,

type= user表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。

vcpu代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。

thread_priority设置优先级。user级优先级设置大于0。

mysqlmysql> create resource group user_ytt type= user vcpu= 0-1 thread_priority=19 enable;Query OK, 0 rows affected(0.03 sec)

RG相关信息可以从 information_schema.resource_groups系统表里检索。

mysqlmysql> select* from information_schema.resource_groups;+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME| RESOURCE_GROUP_TYPE| RESOURCE_GROUP_ENABLED| VCPU_IDS| THREAD_PRIORITY|+---------------------+---------------------+------------------------+----------+-----------------+| USR_default| USER| 1| 0-3| 0|| SYS_default| SYSTEM| 1| 0-3| 0|| user_ytt| USER| 1| 0-1| 19|+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set(0.00 sec)

我们来给语句select guid from t1 group by left(guid,8) order by rand()赋予RG user_ytt。

mysql> show processlist;+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id| User| Host| db| Command| Time| State| Info|+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| 4| event_scheduler| localhost| NULL| Daemon| 10179| Waiting on empty queue| NULL|| 240| root| localhost| ytt| Query| 101| Creating sort index| select guid from t1 group by left(guid,8) order by rand()|| 245| root| localhost| ytt| Query| 0| starting| show processlist|+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set(0.00 sec)

找到连接240对应的thread_id。

mysqlmysql> select thread_id from performance_schema.threads where processlist_id= 240;+-----------+| thread_id|+-----------+| 278|+-----------+1 row in set(0.00 sec)

给这个线程278赋予RG user_ytt。没报错就算成功了。

mysqlmysql> set resource group user_ytt for 278;Query OK, 0 rows affected(0.00 sec)

当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT来单独给这个语句赋予RG。比如:

mysqlmysql> select/*+ resource_group(user_ytt)*/guid from t1 group by left(guid,8) order by rand()....8388602 rows in set(4 min 46.09 sec)

RG的限制:

Linux平台上需要开启 CAPSYSNICE特性。比如我机器上用systemd给mysql服务加上

systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE

mysql线程池开启后RG失效。

freebsd,solaris平台thread_priority失效。

目前只能绑定CPU,不能绑定其他资源。

好了,文章到此结束,希望可以帮助到大家。

sql数据库语言(数据库语言有哪些)人口数据库(全国人口信息库怎样杳)