首页数据库kafka 数据库,软件开发中的Kafka和数据库的关系是什么呢

kafka 数据库,软件开发中的Kafka和数据库的关系是什么呢

编程之家2023-10-2094次浏览

这篇文章给大家聊聊关于kafka 数据库,以及软件开发中的Kafka和数据库的关系是什么呢对应的知识点,希望对各位有所帮助,不要忘了收藏本站哦。

kafka 数据库,软件开发中的Kafka和数据库的关系是什么呢

为什么使用kafka处理mysql binlog

在开发 Spark Streaming的公共组件过程中,需要将 binlog的数据(Array[Byte])转换为 Json格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果如果本文有讲述不详细,或者错误指出,肯请指出,谢谢对于 binlog数据,每一次操作(INSERT/UPDATE/DELETE等)都会作为一条记录写入 binlog文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)在数据库中,有 id, name两个字段,其中 id为主键,name随意, age随意。有两行数据如下idnameage

1john30

2john40

那么你进行操作

update table set age= 50 where name= john的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog记录中,这一点会在后面的实现中有体现。

下面,我们给出具体的代码,然后对代码进行分析def desirializeByte(b:(String, Array[Byte])):(String, String)={val binlogEntry= BinlogEntryUtil.serializeToBean(b._2)//将 Array[Byte]数据转换成 com.meituan.data.binlog.BinlogEntry类,相关类定义参考附录val pkeys= binlogEntry.getPrimaryKeys.asScala//获取主键,这里的 asScala将 Java的 List转换为 Scala的 Listval rowDatas: List[BinlogRow]= binlogEntry.getRowDatas.asScala.toList//获取具体的信息val strRowDatas= rowDatas.map(a=>{//将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)]的形式,方面后面进行 Json化val b= a.getBeforeColumns.asScala//获取 beforColumnsval c= a.getAfterColumns.asScala//获取 afterColumnsval mb= b.map(d=>(d._1, d._2.getValue))//去掉所有不需要的信息,只保留每个字段的值val mc= c.map(c=>(c._1, c._2.getValue))//去掉所有不需要的信息,只保留每个字段的值(mb, mc)//返回转换后的 beforeColumns和 afterColumns})

kafka 数据库,软件开发中的Kafka和数据库的关系是什么呢

//下面利用 json4s进行 Json化

(binlogEntry.getEventType, compact("rowdata"-> strRowDatas.map{w=> List("row_data"->("before"-> w._1.toMap)~("after"-> w._2.toMap))//这里的两个 toMap是必要的,不然里层会变成 List,这个地方比较疑惑的是,//w._1按理是 Map类型,为什么还需要强制转换成 Map//而且用 strRowDatas.foreach(x=> println(s"${x._1}${x._2}")打印的结果表名是 Map}))

desirializeByte函数传入 topic中的一条记录,返回参数自己确定,我这里为了测试,返回一个(String, String)的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete等),第二个字段为 Json化后的数据。

BinlogEntryUtil.serilizeToBean是一个辅助类,将 binlog数据转化为一个 Java bean类。

第 4行,我们得到表对应的主键,第 5行获得具体的数据第 6行到第 12行是 Json化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。

第 14, 15行就是具体的 Json工作了(使用了 json4s包进行 Json化)这个过程中有一点需要注意的是,在 Json化的时候,记得为 w._1和 w._2加 toMap操作,不然会变成 List(很奇怪,我将 w._1和 w._2打印出来看,都是 Map类型)或者你可以在第 7,8行的末尾加上.toMap操作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s组合的时候,出现了问题,有待验证。

kafka 数据库,软件开发中的Kafka和数据库的关系是什么呢

利用上述代码,我们可以得到下面这样 Json化之后的字符串(我进行了排版,程序返回的 Json串是不换行的){"rowdata":

[{"row_data":

{"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}

}

}]

}"

到这里,基本就完成了一种将 binlog数据 Json化的代码。

附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。

public static BinlogEntryserializeToBean(byte[] input){BinlogEntrybinlogEntry= null;

Entryentry= deserializeFromProtoBuf(input);//从 protobuf反序列化if(entry!= null){

binlogEntry= serializeToBean(entry);

}

return binlogEntry;

}

public static EntrydeserializeFromProtoBuf(byte[] input){Entryentry= null;

try{

entry= Entry.parseFrom(input);

//com.alibaba.otter.canal.protocol.CanalEntry#Entry类的方法,由 protobuf生成} catch(InvalidProtocolBufferExceptionvar3){logger.error("Exception:"+ var3);

}

return entry;

}

//将 Entry解析为一个 bean类

public static BinlogEntryserializeToBean(Entryentry){RowChangerowChange= null;

try{

rowChange= RowChange.parseFrom(entry.getStoreValue());} catch(Exceptionvar8){

throw new RuntimeException("parse event has an error, data:"+ entry.toString(), var8);}

BinlogEntrybinlogEntry= new BinlogEntry();String[] logFileNames= entry.getHeader().getLogfileName().split("\\.");String logFileNo="000000";

if(logFileNames.length> 1){

logFileNo= logFileNames[1];

}

binlogEntry.setBinlogFileName(logFileNo);binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());binlogEntry.setTableName(entry.getHeader().getTableName());binlogEntry.setEventType(entry.getHeader().getEventType().toString());IteratorprimaryKeysList= rowChange.getRowDatasList().iterator();while(primaryKeysList.hasNext()){

RowDatarowData=(RowData)primaryKeysList.next();BinlogRowrow= new BinlogRow(binlogEntry.getEventType());row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));binlogEntry.addRowData(row);

}

if(binlogEntry.getRowDatas().size()>= 1){BinlogRowprimaryKeysList1=(BinlogRow)binlogEntry.getRowDatas().get(0);binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));} else{

ArrayListprimaryKeysList2= new ArrayList();binlogEntry.setPrimaryKeys(primaryKeysList2);}

return binlogEntry;

}

public class BinlogEntry implements Serializable{private String binlogFileName;

private long binlogOffset;

private long executeTime;

private String tableName;

private String eventType;

private List<String> primaryKeys;

private List<BinlogRow> rowDatas= new ArrayList();}

public class BinlogRow implements Serializable{public static final String EVENT_TYPE_INSERT="INSERT";public static final String EVENT_TYPE_UPDATE="UPDATE";public static final String EVENT_TYPE_DELETE="DELETE";private String eventType;

private Map<String, BinlogColumn> beforeColumns;private Map<String, BinlogColumn> afterColumns;}

public class BinlogColumn implements Serializable{private int index;

private String mysqlType;

private String name;

private boolean isKey;

private boolean updated;

private boolean isNull;

private String value;

}

canal+Kafka实现mysql与redis数据同步

前言

上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种数据同步的代码和业务代码糅合在一起看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。

架构设计

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

Kafka&Zookeeper搭建

首先在官网下载Kafka:

下载后解压文件夹,可以看到以下几个文件:

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

Canal搭建

canal搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的canal.properties配置文件:

然后配置instance,找到/conf/example/instance.properties配置文件:

经过上述配置后,就可以启动canal了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖

2、封装Redis工具类

在application.yml文件增加以下配置:

封装一个操作Redis的工具类:

3、创建MQ消费者进行同步

创建一个CanalBean对象进行接收:

最后就可以创建一个消费者CanalConsumer进行消费:

测试Mysql与Redis同步

mysql对应的表结构如下:

启动项目后,新增一条数据:

可以在控制台看到以下输出:

如果更新呢?试一下Update语句:

同样可以在控制台看到以下输出:

经过测试完全么有问题。

既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等;尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过同步Mysql数据到Es采用Canal+Kafka的方式来实现的。

软件开发中的Kafka和数据库的关系是什么呢

首先明确说明Kafka不是数据库,它没有schema,也没有表,更没有索引。

1.它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。

那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID特性的,我们依次讨论下ACID。

1、持久性(durability)

我们先从最容易的持久性开始说起,因为持久性最容易理解。在80年代持久性指的是把数据写入到磁带中,这是一种很古老的存储设备,现在应该已经绝迹了。目前实现持久性更常见的做法是将数据写入到物理磁盘上,而这也只能实现单机的持久性。当演进到分布式系统时代后,持久性指的是将数据通过备份机制拷贝到多台机器的磁盘上。很多数据库厂商都有自己的分布式系统解决方案,如GreenPlum和Oracle RAC。它们都提供了这种多机备份的持久性。和它们类似,Apache Kafka天然也是支持这种持久性的,它提供的副本机制在实现原理上几乎和数据库厂商的方案是一样的。

2、原子性(atomicity)

数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Java中有AtomicInteger这样的类能够提供线程安全的整数操作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有操作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,

第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:

在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为独立的consumer消费这个日志——Kafka分区的顺序性保证了app端更新操作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。有了Kafka所有的异质系统都能以相同的顺序应用app端的更新操作,

3、隔离性(isolation)

在传统的关系型数据库中最强的隔离级别通常是指serializability,国内一般翻译成可串行化或串行化。表达的思想就是连接数据库的每个客户端在执行各自的事务时数据库会给它们一个假象:仿佛每个客户端的事务都顺序执行的,即执行完一个事务之后再开始执行下一个事务。其实数据库端同时会处理多个事务,但serializability保证了它们就像单独执行一样。举个例子,在一个论坛系统中,每个新用户都需要注册一个唯一的用户名。一个简单的app实现逻辑大概是这样的:

4、一致性(consistency)

最后说说一致性。按照Kelppmann大神的原话,这是一个很奇怪的属性:在所有ACID特性中,其他三项特性的确属于数据库层面需要实现或保证的,但只有一致性是由用户来保证的。严格来说,它不属于数据库的特性,而应该属于使用数据库的一种方式。坦率说第一次听到这句话时我本人还是有点震惊的,因为从没有往这个方面考虑过,但仔细想想还真是这么回事。比如刚才的注册用户名的例子中我们要求每个用户名是唯一的。这种一致性约束是由我们用户做出的,而不是数据库本身。数据库本身并不关心或并不知道用户名是否应该是唯一的。针对Kafka而言,这种一致性又意味着什么呢?Kelppmann没有具体展开,

希望能帮到你,谢谢!

kafka 数据库的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于软件开发中的Kafka和数据库的关系是什么呢、kafka 数据库的信息别忘了在本站进行查找哦。

电脑游戏主机?游戏电脑主机配置怎么选择集中控制主机(空调中集中控制器是什么)