Northpark博客

Today does not walk, will have to run tomorrow.


  • 首页

  • 关于

  • 旅行

  • 微世界

  • 音乐

  • 读书

  • 归档

Storm学习笔记-第五章 Storm周边框架使用

Posted on 2019-07-18 | In java , storm

环境前置说明:

通过我们的客户端(终端,CRT,XShell)
ssh hadoop@hadoop000
ssh [email protected]
远程服务器的用户名是hadoop,密码也是hadoop
有没有提供root权限,sudo command
hadoop000(192.168.199.102)是远程服务器的hostname
如果你想在本地通过ssh hadoop@hadoop000远程登录,
那么你本地的hosts肯定要添加ip和hostname的映射
192.168.199.102 hadoop000

JDK的安装

将所有的软件都安装到~/app
tar -zxvf jdk-8u91-linux-x64.tar.gz -C ~/app/
建议将jdk的bin目录配置到系统环境变量中: ~/.bash_profile
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
export PATH=$JAVA_HOME/bin:$PATH
让系统环境变量生效
source ~/.bash_profile
验证
java -version

ZooKeeper安装

下载ZK的安装包:http://archive.cloudera.com/cdh5/cdh/5/
解压:tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/
建议ZK_HOME/bin添加到系统环境变量: ~/.bash_profile
export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
让系统环境变量生效
source ~/.bash_profile
修改ZK的配置: $ZK_HOME/conf/zoo.cfg
dataDir=/home/hadoop/app/tmp/zookeeper
启动zk: $ZK_HOME/bin/
zkServer.sh start
验证: jps
多了一个QuorumPeerMain进程,就表示zk启动成功了
jps -m
jps -l

ELK:

www.elastic.co

Logstash 2.4.1

集中、转换和存储数据
Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的 “存储库” 中。(我们的存储库当然是 Elasticsearch。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
cd logstash-2.4.0
bin/logstash -e 'input { stdin { } } output { stdout {} }'
bin/logstash -e 'input { stdin { } } output { stdout {codec => json} }'

https://www.elastic.co/guide/en/logstash/2.4/plugins-inputs-file.html

first-pipeline.conf
input {
file {
path => "/Users/myusername/tutorialdata/*.log"
start_position => beginning
ignore_older => 0
}}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}}

bin/logstash -f first-pipeline.conf --configtest

Kafka概述

和消息系统类似

消息中间件:生产者和消费者

妈妈:生产者
你:消费者
馒头:数据流、消息

    正常情况下: 生产一个  消费一个
    其他情况:  
        一直生产,你吃到某一个馒头时,你卡住(机器故障), 馒头就丢失了
        一直生产,做馒头速度快,你吃来不及,馒头也就丢失了

    拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃

篮子/框: Kafka
    当篮子满了,馒头就装不下了,咋办?
    多准备几个篮子 === Kafka的扩容

Kafka架构

producer:生产者,就是生产馒头(老妈)
consumer:消费者,就是吃馒头的(你)
broker:篮子
topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃

单节点单broker的部署及使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$KAFKA_HOME/config/server.properties

broker.id=0
listeners
host.name
log.dirs
zookeeper.connect

启动Kafka
kafka-server-start.sh
USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

kafka-server-start.sh $KAFKA_HOME/config/server.properties

创建topic: zk
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic

查看所有topic
kafka-topics.sh --list --zookeeper hadoop000:2181

发送消息: broker
kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

消费消息: zk
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning


--from-beginning的使用
【带这个参数消费所有消息】【不带这个参数,只消费客户端启动后的消息】

查看所有topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181
查看指定topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic hello_topic

单节点多broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
server-1.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-1
listeners=PLAINTEXT://:9093
broker.id=1

server-2.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-2
listeners=PLAINTEXT://:9094
broker.id=2

server-3.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-3
listeners=PLAINTEXT://:9095
broker.id=3

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic

kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic

Storm学习笔记-第四章 Storm编程

Posted on 2019-07-16 | In java , storm

ISpout

概述

核心接口(interface),负责将数据发送到topology中去处理
Storm会跟踪Spout发出去的tuple的DAG
ack/fail
tuple: message id
ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面

核心方法

open: 初始化操作
close: 资源释放操作
nextTuple: 发送数据 core api
ack: tuple处理成功,storm会反馈给spout一个成功消息
fail:tuple处理失败,storm会发送一个消息给spout,处理失败

实现类

1
2
3
4
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public interface IRichSpout extends ISpout, IComponent
DRPCSpout
ShellSpout

IComponent接口

概述:

public interface IComponent extends Serializable
为topology中所有可能的组件提供公用的方法

void declareOutputFields(OutputFieldsDeclarer declarer);
用于声明当前Spout/Bolt发送的tuple的名称
使用OutputFieldsDeclarer配合使用

实现类:

1
public abstract class BaseComponent implements IComponent

IBolt接口

概述

职责:接收tuple处理,并进行相应的处理(filter/join/….)
hold住tuple再处理
IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行
nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理

方法

prepare:初始化
execute:处理一个tuple数据,tuple对象中包含了元数据信息
cleanup:shutdown之前的资源清理操作

实现类:

1
2
3
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public interface IRichBolt extends IBolt, IComponent
RichShellBolt

求和案例

需求:1 + 2 + 3 + …. = ???
实现方案:

Spout发送数字作为input
使用Bolt来处理业务逻辑:求和
将结果输出到控制台

拓扑设计: DataSourceSpout –> SumBolt

Read more »

Storm学习笔记-基础知识123

Posted on 2019-06-28 | In java , storm

storm学习

学习途径

1、官方文档

http://storm.apache.org/index.html

2、GitHub,有案例

https://github.com/apache/storm

3、一个官方文档简体中文版的翻译

http://python-storm-tutorial.readthedocs.io/zh_CN/latest/

多尝试、多思考、遇到问题看日志

enter description here
enter description here
enter description here

1、什么是storm

Apache Storm is a free and open source distributed realtime computation system.
免费、开源、分布式、实时计算系统
Storm makes it easy to reliably process unbounded streams of data
unbounded:无界,源源不断
bounded:Hadoop/spark SQL 离线 (input–>output)
doing for realtime processing what Hadoop did for batch processing
storm:实时流处理
Hadoop:离线批处理

2、storm能做什么?

Storm has many use cases:
realtime analytics:实时分析
online machine learning:在线机器学习
continuous computation:持续计算
distributed RPC,
ETL:
and more.

3、storm的特点

fast: over a million tuples processed per second per node 。一秒1亿
scalable(可添加机器)
fault-tolerant 容错
guarantees your data will be processed 保证每条数据被处理
easy to set up and operate.
storm能实现高频数据和大规模数据的实时处理

4、storm发展历史

storm产生于twitter
需求:大户数的实时处理
实时系统要考虑:
1)健壮性
2)拓展性/分布式
3)数据不丢失不重复
4)高性能低延时

5、storm和Hadoop的区别

处理过程
Hadoop—–> map reduce
storm —–> spout bolt
storm进程不杀死不结束
Hadoop进程完成就结束
处理速度: storm 快

6、发展趋势

看:
社区的发展、活跃度
企业的需求
大数据的相关大会,如storm的数量上升
互联网公司使用度

storm 核心概念

课程目录

storm 核心概念

1. Topologies  把顺序串起来的东西/ 调度中心
2. Streams   流,数据流,水流
3. Spouts     产生数据、水流的东西(水龙头)
4. Bolts        处理数据/水流的东西   水壶/水桶
5. Tuple      数据/水
6. Stream groupings
7. Reliability
8. Tasks
9. Workers

Storm核心概念总结

Topology:计算拓扑, 由spout和bolt组成的

Stream:消息流,抽象概念,没有边界的tup le构成

Tuple:消息/数据传递的基本 单元

Spout:消息流的源头,Topology的消息生产者

Bolt:消息处理单元,可以做过滤、聚合、查询/写数据库的操作

Storm核心概念总结

storm概念理解模型

storm理解模型

storm理解模型

最简单实现高并发插入数万条数据(可同步可异步)

Posted on 2019-03-28 | In java

获取数据| 整理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public static int count = 0;

String uniqueTimeId = IDUtils.getInstance().getUniqueTimeId();

String sqlof1 = "select vc_scode from ( "+sqlExpr+" )";

String sqlof2 = SqlExprConstant.pool_sql.replace(SqlExprConstant.POOL_TEMPLATE, poolId);

String joiner = " minus ";

//查询筛选结果
List<Map<String, Object>> reduce1 = riskruleService.executeSql(sqlof1+joiner+sqlof2);


//插入数据库 121_证券筛选执行结果
List<String> keys1 = reduce1.stream().filter(map -> !Objects.isNull(map.get("VC_SCODE")))
.map(i ->(String)i.get("VC_SCODE"))
.distinct().collect(Collectors.toList());


//并发插入数据
CurrencyAdd(keys1.size(), 100, keys1, uniqueTimeId, "1");

高并发插入上万条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 高并发插入上万条数据
* @param totalCount
* @param threadTotal
* @throws InterruptedException
* @author Bruce
*/
public void CurrencyAdd(int totalCount ,int threadTotal,List<String> keys,String uniqueTimeId,String cCompType) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
// 限制同时执行的线程数
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(totalCount);

for (int i = 0; i < totalCount; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add(cCompType);
//根据下标获取取值插入
log.info("current count no is --->{}",count);
CompSecuFilterRes res = new CompSecuFilterRes();
String code = "";

code = keys.get(count-1);

res.setVcScode(code );
res.setVcFilterSeqno(uniqueTimeId);
res.setCCompType(cCompType);
res.setDMdftime(new Date());
riskruleService.addCompSecuFilterRes(res);
semaphore.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
countDownLatch.countDown();

});
}

//异步不等待执行过程就把这行注掉
//countDownLatch.await();
executorService.shutdown();
log.info("count{}", count);
}

private synchronized static void add(String type ) {

count++;

}

日志信息

1
2
3
4
5
6
7
8
9
10
11
12
13
...

2019-03-27 at 18:38:37 CST INFO com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->97
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Preparing: insert into T_COMP_SECU_FILTER_RES (VC_FILTER_SEQNO, C_REC_TYPE, VC_SCODE, C_COMP_TYPE, D_MDFTIME) values (?, ?, ?, ?, ?)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Parameters: 1553683109754193(String), null, 011801593YH(String), 2(String), 2019-03-27(Date)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - <== Updates: 1
2019-03-27 at 18:38:37 CST INFO com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->98
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Preparing: insert into T_COMP_SECU_FILTER_RES (VC_FILTER_SEQNO, C_REC_TYPE, VC_SCODE, C_COMP_TYPE, D_MDFTIME) values (?, ?, ?, ?, ?)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Parameters: 1553683109754193(String), null, 041758030YH(String), 1(String), 2019-03-27(Date)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - <== Updates: 1
2019-03-27 at 18:38:37 CST INFO com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->99

...

高并发、多线程探索-8-并发容器J.U.C--AQS组件CountDownLatch、Semaphore、CyclicBarrier

Posted on 2019-01-03 | In java

AQS简介

AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向列表。

1

Sync queue:同步队列,是一个双向列表。包括head节点和tail节点。head节点主要用作后续的调度。

Condition queue:非必须,单向列表。当程序中存在cindition的时候才会存在此列表

AQS设计思想

使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。
利用int类型标识状态。在AQS类中有一个叫做state的成员变量

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

基于AQS有一个同步组件,叫做ReentrantLock。在这个组件里,stste表示获取锁的线程数,假如state=0,表示还没有线程获取锁,1表示有线程获取了锁。大于1表示重入锁的数量。
继承:子类通过继承并通过实现它的方法管理其状态(acquire和release方法操纵状态)。
可以同时实现排它锁和共享锁模式(独占、共享),站在一个使用者的角度,AQS的功能主要分为两类:独占和共享。它的所有子类中,要么实现并使用了它的独占功能的api,要么使用了共享锁的功能,而不会同时使用两套api,即便是最有名的子类ReentrantReadWriteLock也是通过两个内部类读锁和写锁分别实现了两套api来实现的。

AQS的大致实现思路

AQS内部维护了一个CLH队列来管理锁。线程会首先尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到同步队列sync
queue里。接着会不断的循环尝试获取锁,条件是当前节点为head的直接后继才会尝试。如果失败就会阻塞自己直到自己被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

AQS组件:CountDownLatch

2

通过一个计数来保证线程是否需要被阻塞。实现一个或多个线程等待其他线程执行的场景。
我们定义一个CountDownLatch,通过给定的计数器为其初始化,该计数器是原子性操作,保证同时只有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态。只有其他线程调用countDown方法(每次使计数器-1),使计数器归零才能继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum); //需要被等待的线程执行的方法
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();

CountDownLatch的await方法还有重载形式,可以设置等待的时间,如果超过此时间,计数器还未清零,则不继续等待:

1
2
3
4
countDownLatch.await(10, TimeUnit.MILLISECONDS);

//参数1:等待的时间长度
//参数2:等待的时间单位

AQS组件:Semaphore

用于保证同一时间并发访问线程的数目。
信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。
在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。
使用场景:仅能提供有限访问的资源。比如数据库连接。 Semaphore使用acquire方法和release方法来实现控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 1、普通调用
*/
try {
semaphore.acquire(); // 获取一个许可
test();//需要并发控制的内容
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}

/**
* 2、同时获取多个许可,同时释放多个许可
*/
try {
semaphore.acquire(2);
test();
semaphore.release(2);
} catch (Exception e) {
log.error("exception", e);
}

/*
* 3、尝试获取许可,获取不到不执行
*/
try {
if (semaphore.tryAcquire()) {
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}

/*
* 4、尝试获取许可一段时间,获取不到不执行
* 参数1:等待时间长度 参数2:等待时间单位
*/
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) {
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}

AQS组件:CyclicBarrier

3

也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共的屏障点(循环屏障)通过它可以完成多个线程之间相互等待,只有每个线程都准备就绪后才能继续往下执行后面的操作。
每当有一个线程执行了await方法,计数器就会执行+1操作,待计数器达到预定的值,所有的线程再同时继续执行。由于计数器释放之后可以重用(reset方法),所以称之为循环屏障。

与CountDownLatch区别:
1、计数器可重复用
2、描述一个或多个线程等待其他线程的关系/多个线程相互等待//公共线程循环调用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

//使用方法1:每个线程都持续等待
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}

//使用方法2:每个线程只等待一段时间
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
log.warn("BarrierException", e);
}
}

//使用方法3:在初始化的时候设置runnable,当线程达到屏障时优先执行runnable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});

高并发、多线程探索-7-并发容器 J.U.C(java.util.concurrency) - 线程安全的集合与Map

Posted on 2018-12-29 | In java

概述

Java并发容器JUC是三个单词的缩写。是JDK下面的一个包名。即Java.util.concurrency。
上一节我们介绍了ArrayList、HashMap、HashSet对应的同步容器保证其线程安全,这节我们介绍一下其对应的并发容器。

ArrayList –> CopyOnWriteArrayList

CopyOnWriteArrayList
写操作时复制,当有新元素添加到集合中时,从原有的数组中拷贝一份出来,然后在新的数组上作写操作,将原来的数组指向新的数组。整个数组的add操作都是在锁的保护下进行的,防止并发时复制多份副本。读操作是在原数组中进行,不需要加锁

缺点:
1.写操作时复制消耗内存,如果元素比较多时候,容易导致young gc 和full gc。
2.不能用于实时读的场景.由于复制和add操作等需要时间,故读取时可能读到旧值。 能做到最终一致性,但无法满足实时性的要求,更适合读多写少的场景。
如果无法知道数组有多大,或者add,set操作有多少,慎用此类,在大量的复制副本的过程中很容易出错。

设计思想:
1.读写分离
2.最终一致性
3.使用时另外开辟空间,防止并发冲突

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//构造方法
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;//使用对象数组来承载数据
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}

//添加数据方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;//使用重入锁,保证线程安全
lock.lock();
try {
Object[] elements = getArray();//获取当前数组数据
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//复制当前数组并且扩容+1
newElements[len] = e;//将要添加的数据放入新数组
setArray(newElements);//将原来的数组指向新的数组
return true;
} finally {
lock.unlock();
}
}

//获取数据方法,与普通的get没什么差别
private E get(Object[] a, int index) {
return (E) a[index];
}

HashSet –> CopyOnWriteArraySet

它是线程安全的,底层实现使用的是CopyOnWriteArrayList,因此它也适用于大小很小的set集合,只读操作远大于可变操作。因为他需要copy整个数组,所以包括add、remove、set它的开销相对于大一些。
迭代器不支持可变的remove操作。使用迭代器遍历的时候速度很快,而且不会与其他线程发生冲突。

源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//构造方法
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();//底层使用CopyOnWriteArrayList
}

//添加元素方法,基本实现原理与CopyOnWriteArrayList相同
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {//添加了元素去重操作
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

TreeSet –> ConcurrentSkipListSet

它是JDK6新增的类,同TreeSet一样支持自然排序,并且可以在构造的时候自己定义比较器。

同其他set集合,是基于map集合的(基于ConcurrentSkipListMap),在多线程环境下,里面的contains、add、remove操作都是线程安全的。
多个线程可以安全的并发的执行插入、移除、和访问操作。但是对于批量操作addAll、removeAll、retainAll和containsAll并不能保证以原子方式执行,原因是addAll、removeAll、retainAll底层调用的还是contains、add、remove方法,只能保证每一次的执行是原子性的,代表在单一执行操纵时不会被打断,但是不能保证每一次批量操作都不会被打断。在使用批量操作时,还是需要手动加上同步操作的。
不允许使用null元素的,它无法可靠的将参数及返回值与不存在的元素区分开来。

源码分析:

1
2
3
4
//构造方法
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();//使用ConcurrentSkipListMap实现
}
Read more »
12…16
Bruce

Bruce

Extreme ways

92 posts
45 categories
83 tags
RSS
GitHub Twitter Facebook
Links
  • NorthPark
  • 挖粪の男孩
  • 小白博客
© 2015 - 2019 Bruce
Powered by Hexo
NorthPark中文网
Mac破解软件
院线大片
情商提升
  本站访客数 人次   本站总访问量 次