首页编程java编程zookeeper java api(java开发api接口如何编写)

zookeeper java api(java开发api接口如何编写)

编程之家2026-05-19642次浏览

本篇文章给大家谈谈zookeeper java api,以及java开发api接口如何编写对应的知识点,文章可能有点长,但是希望大家可以阅读完,增长自己的知识,最重要的是希望对各位有所帮助,可以解决了您的问题,不要忘了收藏本站喔。

zookeeper java api(java开发api接口如何编写)

zookeeper怎么实现分布式锁

1.利用节点名称的唯一性来实现共享锁

ZooKeeper抽象出来的节点结构是一个和unix文件系统类似的小型的树状的目录结构。ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名。例如:我们在Zookeeper目录/test目录下创建,两个客户端创建一个名为Lock节点,只有一个能够成功。

算法思路:利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁。解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁。

基于以上机制,利用节点名称唯一性机制的共享锁算法流程如图所示:

该共享锁实现很符合我们通常多个线程去竞争锁的概念,利用节点名称唯一性的做法简明、可靠。

由上述算法容易看出,由于客户端会同时收到/test/Lock被删除的通知,重新进入竞争创建节点,故存在"惊群现象"。

zookeeper java api(java开发api接口如何编写)

使用该方法进行测试锁的性能列表如下:

总结这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单。缺点是会产生“惊群”效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁。

2.利用临时顺序节点实现共享锁的一般做法

首先介绍一下,Zookeeper中有一种节点叫做顺序节点,故名思议,假如我们在/lock/目录下创建节3个点,ZooKeeper集群会按照提起创建的顺序来创建节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。

ZooKeeper中还有一种名为临时节点的节点,临时节点由某个客户端创建,当客户端与ZooKeeper集群断开连接,则开节点自动被删除。

利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:

zookeeper java api(java开发api接口如何编写)

客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。

客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。

客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。

如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。

上面这个分布式锁的实现中,大体能够满足了一般的分布式集群竞争锁的需求。这里说的一般性场景是指集群规模不大,一般在10台机器以内。

不过,细想上面的实现逻辑,我们很容易会发现一个问题,步骤4,“即获取所有的子点,判断自己创建的节点是否已经是序号最小的节点”,这个过程,在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知——这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的惊群效应。而这个问题的根源在于,没有找准客户端真正的关注点。

我们再来回顾一下上面的分布式锁竞争过程,它的核心逻辑在于:判断自己是否是所有节点中序号最小的。于是,很容易可以联想的到的是,每个节点的创建者只需要关注比自己序号小的那个节点。

3、利用临时顺序节点实现共享锁的改进实现

下面是改进后的分布式锁实现,和之前的实现方式唯一不同之处在于,这里设计成每个锁竞争者,只需要关注”locknode”节点下序号比自己小的那个节点是否存在即可。

算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。

对于解锁操作,只需要将自身创建的节点删除即可。

具体算法流程如下图所示:

使用上述算法进行测试的的结果如下表所示:

该算法只监控比自身创建节点序列号小(比自己小的最大的节点)的节点,在当前获得锁的节点释放锁的时候没有“惊群”。

总结利用临时顺序节点来实现分布式锁机制其实就是一种按照创建顺序排队的实现。这种方案效率高,避免了“惊群”效应,多个客户端共同等待锁,当锁释放时只有一个客户端会被唤醒。

4、使用menagerie

其实就是对方案3的一个封装,不用自己写代码了。直接拿来用就可以了。

menagerie基于Zookeeper实现了java.util.concurrent包的一个分布式版本。这个封装是更大粒度上对各种分布式一致性使用场景的抽象。其中最基础和常用的是一个分布式锁的实现: org.menagerie.locks.ReentrantZkLock,通过ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL类型znode的支持,实现了分布式锁。具体做法是:不同的client上每个试图获得锁的线程,都在相同的basepath下面创建一个EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要创建的是临时znode,创建连接断开时会自动删除; SEQUENTIAL表示要自动在传入的path后面缀上一个自增的全局唯一后缀,作为最终的path。因此对不同的请求ZK会生成不同的后缀,并分别返回带了各自后缀的path给各个请求。因为ZK全局有序的特性,不管client请求怎样先后到达,在ZKServer端都会最终排好一个顺序,因此自增后缀最小的那个子节点,就对应第一个到达ZK的有效请求。然后client读取basepath下的所有子节点和ZK返回给自己的path进行比较,当发现自己创建的sequential node的后缀序号排在第一个时,就认为自己获得了锁;否则的话,就认为自己没有获得锁。这时肯定是有其他并发的并且是没有断开的client/线程先创建了node。

Zookeeper在哪些系统中使用,又是怎么用的

ZooKeeper作为发现服务的问题 ZooKeeper(注:ZooKeeper是著名Hadoop的一个子项目,旨在解决大规模分布式应用场景下,服务协调同步(Coordinate Service)的问题;它可以为同在一个分布式系统中的其他服务提供:统一命名服务、配置管理、分布式锁服务、集群管理等功能)是个伟大的开源项目,它很成熟,有相当大的社区来支持它的发展,而且在生产环境得到了广泛的使用;但是用它来做Service发现服务解决方案则是个错误。在分布式系统领域有个著名的 CAP定理(C-数据一致性;A-服务可用性;P-服务对网络分区故障的容错性,这三个特性在任何分布式系统中不能同时满足,最多同时满足两个);ZooKeeper是个 CP的,即任何时刻对ZooKeeper的访问请求能得到一致的数据结果,同时系统对网络分割具备容错性;但是它不能保证每次服务请求的可用性(注:也就是在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。但是别忘了,ZooKeeper是分布式协调服务,它的职责是保证数据(注:配置数据,状态数据)在其管辖下的所有服务之间保持同步、一致;所以就不难理解为什么ZooKeeper被设计成CP而不是AP特性的了,如果是AP的,那么将会带来恐怖的后果(注:ZooKeeper就像交叉路口的信号灯一样,你能想象在交通要道突然信号灯失灵的情况吗?)。而且,作为ZooKeeper的核心实现算法 Zab,就是解决了分布式系统下数据如何在多个服务之间保持同步问题的。作为一个分布式协同服务,ZooKeeper非常好,但是对于Service发现服务来说就不合适了;因为对于Service发现服务来说就算是返回了包含不实的信息的结果也比什么都不返回要好;再者,对于Service发现服务而言,宁可返回某服务5分钟之前在哪几个服务器上可用的信息,也不能因为暂时的网络故障而找不到可用的服务器,而不返回任何结果。所以说,用ZooKeeper来做Service发现服务是肯定错误的,如果你这么用就惨了!而且更何况,如果被用作Service发现服务,ZooKeeper本身并没有正确的处理网络分割的问题;而在云端,网络分割问题跟其他类型的故障一样的确会发生;所以最好提前对这个问题做好100%的准备。就像 Jepsen在 ZooKeeper网站上发布的博客中所说:在ZooKeeper中,如果在同一个网络分区(partition)的节点数(nodes)数达不到 ZooKeeper选取Leader节点的“法定人数”时,它们就会从ZooKeeper中断开,当然同时也就不能提供Service发现服务了。如果给ZooKeeper加上客户端缓存(注:给ZooKeeper节点配上本地缓存)或者其他类似技术的话可以缓解ZooKeeper因为网络故障造成节点同步信息错误的问题。 Pinterest与 Airbnb公司就使用了这个方法来防止ZooKeeper故障发生。这种方式可以从表面上解决这个问题,具体地说,当部分或者所有节点跟ZooKeeper断开的情况下,每个节点还可以从本地缓存中获取到数据;但是,即便如此,ZooKeeper下所有节点不可能保证任何时候都能缓存所有的服务注册信息。如果 ZooKeeper下所有节点都断开了,或者集群中出现了网络分割的故障(注:由于交换机故障导致交换机底下的子网间不能互访);那么ZooKeeper会将它们都从自己管理范围中剔除出去,外界就不能访问到这些节点了,即便这些节点本身是“健康”的,可以正常提供服务的;所以导致到达这些节点的服务请求被丢失了。(注:这也是为什么ZooKeeper不满足CAP中A的原因)更深层次的原因是,ZooKeeper是按照CP原则构建的,也就是说它能保证每个节点的数据保持一致,而为ZooKeeper加上缓存的做法的目的是为了让ZooKeeper变得更加可靠(available);但是,ZooKeeper设计的本意是保持节点的数据一致,也就是CP。所以,这样一来,你可能既得不到一个数据一致的(CP)也得不到一个高可用的(AP)的Service发现服务了;因为,这相当于你在一个已有的CP系统上强制栓了一个AP的系统,这在本质上就行不通的!一个Service发现服务应该从一开始就被设计成高可用的才行!如果抛开CAP原理不管,正确的设置与维护ZooKeeper服务就非常的困难;错误会经常发生,导致很多工程被建立只是为了减轻维护ZooKeeper的难度。这些错误不仅存在与客户端而且还存在于ZooKeeper服务器本身。Knewton平台很多故障就是由于ZooKeeper使用不当而导致的。那些看似简单的操作,如:正确的重建观察者(reestablishing watcher)、客户端Session与异常的处理与在ZK窗口中管理内存都是非常容易导致ZooKeeper出错的。同时,我们确实也遇到过 ZooKeeper的一些经典bug: ZooKeeper-1159与 ZooKeeper-1576;我们甚至在生产环境中遇到过ZooKeeper选举Leader节点失败的情况。这些问题之所以会出现,在于ZooKeeper需要管理与保障所管辖服务群的Session与网络连接资源(注:这些资源的管理在分布式系统环境下是极其困难的);但是它不负责管理服务的发现,所以使用ZooKeeper当 Service发现服务得不偿失。做出正确的选择:Eureka的成功我们把Service发现服务从ZooKeeper切换到了Eureka平台,它是一个开源的服务发现解决方案,由Netflix公司开发。(注:Eureka由两个组件组成:Eureka服务器和Eureka客户端。Eureka服务器用作服务注册服务器。Eureka客户端是一个java客户端,用来简化与服务器的交互、作为轮询负载均衡器,并提供服务的故障切换支持。)Eureka一开始就被设计成高可用与可伸缩的Service发现服务,这两个特点也是Netflix公司开发所有平台的两个特色。(他们都在讨论Eureka)。自从切换工作开始到现在,我们实现了在生产环境中所有依赖于Eureka的产品没有下线维护的记录。我们也被告知过,在云平台做服务迁移注定要遇到失败;但是我们从这个例子中得到的经验是,一个优秀的Service发现服务在其中发挥了至关重要的作用!首先,在Eureka平台中,如果某台服务器宕机,Eureka不会有类似于ZooKeeper的选举leader的过程;客户端请求会自动切换到新的Eureka节点;当宕机的服务器重新恢复后,Eureka会再次将其纳入到服务器集群管理之中;而对于它来说,所有要做的无非是同步一些新的服务注册信息而已。所以,再也不用担心有“掉队”的服务器恢复以后,会从Eureka服务器集群中剔除出去的风险了。Eureka甚至被设计用来应付范围更广的网络分割故障,并实现“0”宕机维护需求。当网络分割故障发生时,每个Eureka节点,会持续的对外提供服务(注:ZooKeeper不会):接收新的服务注册同时将它们提供给下游的服务发现请求。这样一来,就可以实现在同一个子网中(same side of partition),新发布的服务仍然可以被发现与访问。但是,Eureka做到的不止这些。正常配置下,Eureka内置了心跳服务,用于淘汰一些“濒死”的服务器;如果在Eureka中注册的服务,它的“心跳”变得迟缓时,Eureka会将其整个剔除出管理范围(这点有点像ZooKeeper的做法)。这是个很好的功能,但是当网络分割故障发生时,这也是非常危险的;因为,那些因为网络问题(注:心跳慢被剔除了)而被剔除出去的服务器本身是很”健康“的,只是因为网络分割故障把Eureka集群分割成了独立的子网而不能互访而已。幸运的是,Netflix考虑到了这个缺陷。如果Eureka服务节点在短时间里丢失了大量的心跳连接(注:可能发生了网络故障),那么这个 Eureka节点会进入”自我保护模式“,同时保留那些“心跳死亡“的服务注册信息不过期。此时,这个Eureka节点对于新的服务还能提供注册服务,对于”死亡“的仍然保留,以防还有客户端向其发起请求。当网络故障恢复后,这个Eureka节点会退出”自我保护模式“。所以Eureka的哲学是,同时保留”好数据“与”坏数据“总比丢掉任何”好数据“要更好,所以这种模式在实践中非常有效。最后,Eureka还有客户端缓存功能(注:Eureka分为客户端程序与服务器端程序两个部分,客户端程序负责向外提供注册与发现服务接口)。所以即便Eureka集群中所有节点都失效,或者发生网络分割故障导致客户端不能访问任何一台Eureka服务器;Eureka服务的消费者仍然可以通过 Eureka客户端缓存来获取现有的服务注册信息。甚至最极端的环境下,所有正常的Eureka节点都不对请求产生相应,也没有更好的服务器解决方案来解决这种问题时;得益于Eureka的客户端缓存技术,消费者服务仍然可以通过Eureka客户端查询与获取注册服务信息,这点很重要。 Eureka的构架保证了它能够成为Service发现服务。它相对与ZooKeeper来说剔除了Leader节点的选取或者事务日志机制,这样做有利于减少使用者维护的难度也保证了Eureka的在运行时的健壮性。而且Eureka就是为发现服务所设计的,它有独立的客户端程序库,同时提供心跳服务、服务健康监测、自动发布服务与自动刷新缓存的功能。但是,如果使用ZooKeeper你必须自己来实现这些功能。Eureka的所有库都是开源的,所有人都能看到与使用这些源代码,这比那些只有一两个人能看或者维护的客户端库要好。维护Eureka服务器也非常的简单,比如,切换一个节点只需要在现有EIP下移除一个现有的节点然后添加一个新的就行。Eureka提供了一个 web-based的图形化的运维界面,在这个界面中可以查看Eureka所管理的注册服务的运行状态信息:是否健康,运行日志等。Eureka甚至提供了Restful-API接口,方便第三方程序集成Eureka的功能。

如何使用Java API操作Hbase

先导入hbase的相关jar包。

再根据api进行操作。

package com.util;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.MasterNotRunningException;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.ZooKeeperConnectionException;

import org.apache.hadoop.hbase.client.Connection;

import org.apache.hadoop.hbase.client.ConnectionFactory;

import org.apache.hadoop.hbase.client.Delete;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.client.Table;

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FilterList;

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

import org.apache.hadoop.hbase.util.Bytes;

public class HBaseUtil{

public static Configuration configuration;

static{

configuration= HBaseConfiguration.create();

configuration.set("hbase.zookeeper.property.clientPort","2181");

configuration.set("hbase.zookeeper.quorum","192.168.1.103");

configuration.set("hbase.master","192.168.1.103:600000");

}

public static void main(String[] args) throws IOException{

createTable("abc");

insertData("abc");

QueryAll("abc");

}

/**

*创建表

*@param tableName

*/

public static void createTable(String tableName){

System.out.println("start create table......");

try{

HBaseAdmin hBaseAdmin= new HBaseAdmin(configuration);

if(hBaseAdmin.tableExists(tableName)){//如果存在要创建的表,那么先删除,再创建

hBaseAdmin.disableTable(tableName);

hBaseAdmin.deleteTable(tableName);

System.out.println(tableName+" is exist,detele....");

}

HTableDescriptor tableDescriptor= new HTableDescriptor(TableName.valueOf(tableName));

tableDescriptor.addFamily(new HColumnDescriptor("column1"));

tableDescriptor.addFamily(new HColumnDescriptor("column2"));

tableDescriptor.addFamily(new HColumnDescriptor("column3"));

hBaseAdmin.createTable(tableDescriptor);

hBaseAdmin.close();

} catch(MasterNotRunningException e){

e.printStackTrace();

} catch(ZooKeeperConnectionException e){

e.printStackTrace();

} catch(IOException e){

e.printStackTrace();

}

System.out.println("end create table......");

}

/**

*插入数据

*@param tableName

*@throws IOException

*/

public static void insertData(String tableName) throws IOException{

System.out.println("start insert data......");

Connection connection= ConnectionFactory.createConnection(configuration);

Table table= connection.getTable(TableName.valueOf(tableName));

Put put= new Put("112233bbbcccc".getBytes());//一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值

put.add("column1".getBytes(), null,"aaa".getBytes());//本行数据的第一列

put.add("column2".getBytes(), null,"bbb".getBytes());//本行数据的第三列

put.add("column3".getBytes(), null,"ccc".getBytes());//本行数据的第三列

try{

table.put(put);

} catch(IOException e){

e.printStackTrace();

}

System.out.println("end insert data......");

}

/**

*删除一张表

*@param tableName

*/

public static void dropTable(String tableName){

try{

HBaseAdmin admin= new HBaseAdmin(configuration);

admin.disableTable(tableName);

admin.deleteTable(tableName);

} catch(MasterNotRunningException e){

e.printStackTrace();

} catch(ZooKeeperConnectionException e){

e.printStackTrace();

} catch(IOException e){

e.printStackTrace();

}

}

/**

*根据 rowkey删除一条记录

*@param tablename

*@param rowkey

*/

public static void deleteRow(String tablename, String rowkey){

try{

HTable table= new HTable(configuration, tablename);

List list= new ArrayList();

Delete d1= new Delete(rowkey.getBytes());

list.add(d1);

table.delete(list);

System.out.println("删除行成功!");

} catch(IOException e){

e.printStackTrace();

}

}

/**

*组合条件删除

*@param tablename

*@param rowkey

*/

public static void deleteByCondition(String tablename, String rowkey){

//目前还没有发现有效的API能够实现根据非rowkey的条件删除这个功能能,还有清空表全部数据的API操作

}

/**

*查询所有数据

*@param tableName

*@throws IOException

*/

public static void QueryAll(String tableName) throws IOException{

Connection connection= ConnectionFactory.createConnection(configuration);

Table table= connection.getTable(TableName.valueOf(tableName));

try{

ResultScanner rs= table.getScanner(new Scan());

for(Result r: rs){

System.out.println("获得到rowkey:"+ new String(r.getRow()));

for(KeyValue keyValue: r.raw()){

System.out.println("列:"+ new String(keyValue.getFamily())

+"====值:"+ new String(keyValue.getValue()));

}

}

} catch(IOException e){

e.printStackTrace();

}

}

/**

*单条件查询,根据rowkey查询唯一一条记录

*@param tableName

*/

public static void QueryByCondition1(String tableName){

HTablePool pool= new HTablePool(configuration, 1000);

HTable table=(HTable) pool.getTable(tableName);

try{

Get scan= new Get("abcdef".getBytes());//根据rowkey查询

Result r= table.get(scan);

System.out.println("获得到rowkey:"+ new String(r.getRow()));

for(KeyValue keyValue: r.raw()){

System.out.println("列:"+ new String(keyValue.getFamily())

+"====值:"+ new String(keyValue.getValue()));

}

} catch(IOException e){

e.printStackTrace();

}

}

/**

*单条件按查询,查询多条记录

*@param tableName

*/

public static void QueryByCondition2(String tableName){

try{

HTablePool pool= new HTablePool(configuration, 1000);

HTable table=(HTable) pool.getTable(tableName);

Filter filter= new SingleColumnValueFilter(Bytes

.toBytes("column1"), null, CompareOp.EQUAL, Bytes

.toBytes("aaa"));//当列column1的值为aaa时进行查询

Scan s= new Scan();

s.setFilter(filter);

ResultScanner rs= table.getScanner(s);

for(Result r: rs){

System.out.println("获得到rowkey:"+ new String(r.getRow()));

for(KeyValue keyValue: r.raw()){

System.out.println("列:"+ new String(keyValue.getFamily())

+"====值:"+ new String(keyValue.getValue()));

}

}

} catch(Exception e){

e.printStackTrace();

}

}

/**

*组合条件查询

*@param tableName

*/

public static void QueryByCondition3(String tableName){

try{

HTablePool pool= new HTablePool(configuration, 1000);

HTable table=(HTable) pool.getTable(tableName);

List<Filter> filters= new ArrayList<Filter>();

Filter filter1= new SingleColumnValueFilter(Bytes

.toBytes("column1"), null, CompareOp.EQUAL, Bytes

.toBytes("aaa"));

filters.add(filter1);

Filter filter2= new SingleColumnValueFilter(Bytes

.toBytes("column2"), null, CompareOp.EQUAL, Bytes

.toBytes("bbb"));

filters.add(filter2);

Filter filter3= new SingleColumnValueFilter(Bytes

.toBytes("column3"), null, CompareOp.EQUAL, Bytes

.toBytes("ccc"));

filters.add(filter3);

FilterList filterList1= new FilterList(filters);

Scan scan= new Scan();

scan.setFilter(filterList1);

ResultScanner rs= table.getScanner(scan);

for(Result r: rs){

System.out.println("获得到rowkey:"+ new String(r.getRow()));

for(KeyValue keyValue: r.raw()){

System.out.println("列:"+ new String(keyValue.getFamily())

+"====值:"+ new String(keyValue.getValue()));

}

}

rs.close();

} catch(Exception e){

e.printStackTrace();

}

}

}

OK,关于zookeeper java api和java开发api接口如何编写的内容到此结束了,希望对大家有所帮助。

oracle数据库查询?oracle与金蝶用友区别html中nav标签的用法?html中div标签用法大全