jason's blog


  • 首页

  • 归档

  • 分类

  • 标签

  • 关于

Cassandra 3.x 数据读取过程【译】

发表于 2020-08-11

前言

最近对 LSM 树、Cassandra 的读取过程感兴趣,奈何印象模糊,故去官网查阅、复习了一下。顺便译成此文。

Cassandra、HBase、LevelDB 均是采用了 LSM 树进行存储,故三者的读写过程都大同小异,都具有 commitLog、memTable、SSTable(LSM) 结构,理解本文可举一反三。

简介

读取时,Cassandra 会合并 MemTable 和至少一个 SSTable 的结果。

Cassandra 寻找数据的过程有多个步骤。从 MemTable 中的数据开始,以 SSTables 结尾:

  1. 检查 MemTable
  2. 检查行缓存(如果启用)
  3. 检查布隆过滤器
  4. 检查分区键缓存(如果启用)
  5. 如果在分区键缓存中找到了分区键,则直接转到压缩偏移量映射表;如果未找到分区键,则检查 Partition Summary ,选中 Partition Summary 后,访问对应的分区索引
  6. 使用压缩偏移量映射表定位磁盘上的数据
  7. 从磁盘上的 SSTable 中获取数据

MemTable

如果 MemTable 具有所需的分区数据,则读取该数据,然后将其与 SSTables 中的数据合并。访问 SSTable 数据的步骤如下。

Row Cache(行缓存)

它在所有数据库中都是典型的,读取速度最快,通过将访问频繁的数据放入内存中实现。操作系统页缓存(page cache)最适合提高读取性能,尽管行缓存也可以为读取密集型操作(读取操作占负载的95%)提供一些改进。行缓存禁止用于写密集型操作。如果启用行缓存,会将 SSTables 中的部分分区数据存储在内存中。在 Cassandra 2.2和更高版本中,它存储在堆外内存中,这样减轻了 JVM 的 GC 压力。当缓存已满时,行缓存使用 LRU 策略回收内存。

行缓存大小以及要存储的行数都是可配置的。配置存储行数很重要,可以使类似查询“最后10项”的速度非常快。如果启用了行缓存,则将从行缓存中读取所需的分区数据,从而有可能节省两次从磁盘上查找的时间。行缓存中存储的行是经常访问的行,在访问它们时,这些行将合并并从 SSTable 保存到行缓存中。存储后,数据可用于以后的查询。行缓存不可以通过写操作添加。如果对该行进行写操作,则该行缓存将失效,直到读取该行后才再次被缓存。同样,如果更新分区,则会从缓存中逐出整个分区。如果行缓存中没有想要的数据,则检查布隆过滤器。

Bloom Filter(布隆过滤器)

首先,Cassandra 会检查布隆过滤器,以查找哪些 SSTables 可能具有请求分区数据。布隆过滤器存储在堆外存储器中。每个 SSTable 都有一个关联的布隆过滤器。布隆过滤器可以确定 SSTable 不包含哪些分区的数据。布隆过滤器还可以查出相应分区数据存储在一个 SSTable 中的可能性。它可以通过缩小键池大小来提高查找分区键的速度。但是,由于布隆过滤器是一个概率函数,因此可能导致误报。布隆过滤器找出的 SSTable 中也可能没有对应的数据。如果布隆过滤器不可以确定是哪个 SSTable ,Cassandra 会检查分区键缓存。

每十亿个分区,布隆过滤器大约会占用1-2GB大小空间。在极端情况下,每行可以对应一个分区,因此在一台机器上可以轻松拥有数十亿个这样的条目。如果要以内存换取性能,则布隆过滤器是可调的。

Partition Key Cache(分区键缓存)

如果启用分区键缓存,会将分区索引存储在堆外内存中。键缓存占内存小,且可配置,每次“命中”都可以减少一次查找。如果在键缓存中找到分区键,则可以直接转到压缩偏移量映射表,从磁盘上找到对应的数据。分区键缓存一旦预热就可以更好地发挥作用,并且可以大大提高冷启动时读取的性能。如果节点上的内存非常有限,可以限定保存在键缓存中的分区键的数量。如果在键缓存中找不到分区键,则搜索 Partition Summary。

分区键缓存的大小以及存储在键缓存中的分区键的数量都是可配置的。

Partition Summary(分区稀疏索引?这个直接翻译不太好~)

Partition Summary 是一种堆外内存结构,存储了分区索引的采样(可以理解为分区稀疏索引~)。分区索引包含所有分区键,而 Partition Summary 每隔 X 个键进行采样,并映射索引文件中第 Xth 键的位置。例如,如果 Partition Summary 设置为每20个键采样一次,它会将 SSTable 文件的开头作为第一个键,然后是第20个键及其在文件中的位置,依此类推。虽然不如分区键的位置那么精确,但是 Partition Summary 可以缩短扫描时间。找到可能的分区键值的范围后,然后确定分区索引。

通过配置采样频率,您可以以内存换取性能,Partition Summary 的粒度越小,使用的内存就越多。配置 index_interval 属性可以更改采样频率。可以通过配置 index_summary_capacity_in_mb 属性设置占用固定大小的内存,默认为堆大小的5%。

Partition Index(分区索引)

分区索引位于磁盘上,存储了映射到所有分区键的位置。检查了 Partition Summary 中的分区键范围后,然后转到分区索引以查找所需分区键的位置。对范围内的列将进行遍历查找。使用找到的信息,分区索引进入压缩偏移量映射表,以在磁盘上找到包含数据的压缩块。如果必须搜索分区索引,则需要两次磁盘搜索才能找到所需的数据。

Compression offset map(压缩偏移量映射表)

压缩偏移量映射表存储了分区数据的确切位置。它存储在堆外内存中,可以通过分区键缓存或分区索引进行访问。从压缩偏移量映射表确定了磁盘位置后,就可以从对应的的 SSTable 中查询出被压缩的分区数据。

注意:在分区内,并非查询所有行的代价都相同。由于不需要查询分区级索引,因此查询分区的最开始(第一行,由自定义键决定)的开销稍低一些。

每TB数据,压缩偏移量映射表将占用 1-3 GB。压缩数据越多,压缩块的数量就越多,压缩偏移量映射表也就越大。尽管使用压缩偏移量映射表会消耗 CPU 资源,默认情况下还是会启用压缩。启用压缩将使页缓存更有效,通常来说,利大于弊。

翻译自官方文档

一文了解 Spring Starter 原理

发表于 2020-04-22

简介

Spring Starter相当于模块,它能将模块所需的依赖整合起来并对模块内的Bean根据环境( 条件)进行自动配置。使用者只需要依赖相应功能的Starter,无需做过多的配置和依赖,Spring Boot就能自动扫描并加载相应的模块。

Quick Start

我们以创建一个钉钉机器人 starter 为例

  1. 创建项目,添加必选依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
</dependencies>
  1. 创建配置类,用于读取配置信息
1
2
3
4
5
6
7
8
@ConfigurationProperties(prefix = "dingding.chatbot")
@Data
public class DingdingProperties {
private String secret;
private String accessToken;
private String webHook = "https://oapi.dingtalk.com/robot/send?access_token=";
private Double rate = 1D/6D;
}

@ConfigurationProperties 注解用于设置配置信息的前缀

例如:accessToken 在配置文件中dingding.chatbot.access-token = 123456

  1. 创建自动装配类,创建想要生成的对象并装配到容器中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Configuration
@EnableConfigurationProperties(DingdingProperties.class)
public class DingdingAutoConfiguration {

@Resource
private DingdingProperties dingdingProperties;

@Bean
@ConditionalOnMissingBean(DingChatBot.class)
public DingChatBot createDingdingBean() {
return new DingChatBot(
dingdingProperties.getSecret(),
dingdingProperties.getAccessToken(),
dingdingProperties.getWebHook() + dingdingProperties.getAccessToken(),
dingdingProperties.getRate());
}
}
  1. 创建 spring.factories 文件,用于 SpringBoot 扫描

内容如下:

1
2
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.souche.chatbot.dingding.DingdingAutoConfiguration

spring.factories 放在 resources/META-INF 目录下。
容器启动时,SpringFactoriesLoader 类会扫描该文件,将相应的类实例化装配到容器中。
关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
   //解析成 CLASS 并实例化
public static <T> List<T> loadFactories(Class<T> factoryClass, @Nullable ClassLoader classLoader) {
//获取 ClassLoader
ClassLoader classLoaderToUse = classLoader;
//解析成 CLASS
List<String> factoryNames = loadFactoryNames(factoryClass, classLoaderToUse);

List<T> result = new ArrayList<>(factoryNames.size());
for (String factoryName : factoryNames) {
//依次实例化
result.add(instantiateFactory(factoryName, factoryClass, classLoaderToUse));
}
//排序后返回
AnnotationAwareOrderComparator.sort(result);
return result;
}

public static List<String> loadFactoryNames(Class<?> factoryClass, @Nullable ClassLoader classLoader) {
String factoryClassName = factoryClass.getName();
return loadSpringFactories(classLoader).getOrDefault(factoryClassName, Collections.emptyList());
}


private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) {
//按 classLoader 保存已经解析出的类
MultiValueMap<String, String> result = cache.get(classLoader);
if (result != null) {
return result;
}

try {
//读取所有的 "META-INF/spring.factories" 文件
Enumeration<URL> urls = (classLoader != null ?
classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
result = new LinkedMultiValueMap<>();
//按行读取 CLASS 绝对路径
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
UrlResource resource = new UrlResource(url);
//spring.factories 文件 解析成 KEY VALUE 形式
Properties properties = PropertiesLoaderUtils.loadProperties(resource);
for (Map.Entry<?, ?> entry : properties.entrySet()) {
List<String> factoryClassNames = Arrays.asList(
StringUtils.commaDelimitedListToStringArray((String) entry.getValue()));
result.addAll((String) entry.getKey(), factoryClassNames);
}
}
cache.put(classLoader, result);
return result;
}
catch (IOException ex) {
throw new IllegalArgumentException("Unable to load factories from location [" +
FACTORIES_RESOURCE_LOCATION + "]", ex);
}
}
  1. 添加 spring-boot-configuration-processor 依赖,可以在引用 starter 后自动读取相应的配置文件
1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<version>2.0.6.RELEASE</version>
</dependency>
  1. 使用

1)添加上面的依赖

2)添加配置

1
2
3
# 钉钉
dingding.chatbot.secret= 1231231123121
dingding.chatbot.access-token = 1231231123121

3)使用demo

1
2
3
4
5
6
7
8
9
10
@Autowired
private DingChatBot chatBot;

public void method() {
try {
chatBot.sendMsg(errorMsg);
} catch (RateLimiterException rateLimiterException) {
log.error(rateLimiterException.getMessage(), rateLimiterException);
}
}

源码参考

Flink 架构核心概念

发表于 2020-04-11

任务和算子链

先介绍一下:算子(Operator)

Logical Graph 是一种描述流处理程序的高阶逻辑有向图。边代表输入/输出关系、数据流和数据集之一。算子是 Logical Graph 的节点,执行某种操作,该操作通常由 Function 执行。Source 和 Sink 是数据输入和数据输出的特殊算子。

分布式计算中,Flink 将算子(operator)子任务(subtask) 链接成一个个的任务(task)。每个任务占一个单独的线程。把多个算子链接成一个任务减少了线程间切换和缓冲的开销,并且在降低延迟的同时提高了整体吞吐量。

下图的示例数据流由五个子任务执行,因此由五个并行线程执行。

Job Managers、Task Managers 和 Clients

Flink 运行环境包含两类进程:
JobManagers(“主管” master)主要用来协调分布式计算。它们负责调度任务、协调检查点(checkpoints,参见下面)、协调故障恢复等。至少得有一个 JobManager(必须得有个安排任务的),高可用环境下可以有多个 JobManagers,其中一个正常工作,其它的作为“备胎”。
TaskManagers(“搬砖工” woker)主要用来执行任务(确切来说是子任务),缓存产生的数据并交换数据流。TaskManager 也至少得有一个(也不能没有打工的…)。

JobManagers 和 TaskManagers 有多种启动方式:
直接在机器上启动称为 standalone 集群;
在容器或资源管理框架 中启动,如 YARN 或 Mesos,TaskManagers 会连接到 JobManagers,通知后者已经可用,然后开始工作。

客户端(Client)虽然不是运行时(runtime)和程序执行时的一部分,但它被用来准备数据流并向 JobManager 提交。提交完之后客户端就可以断开连接,或者保持连接来接收进度报告。客户端既可以作为 Java / Scala 程序启动,也可以在命令行中运行,如 ./bin/flink run …。

Task Slots 和资源

每个 worker(TaskManager)都是一个 JVM 进程,子任务通过其中不同的线程执行。每个 worker 可以接收任务的数量与它拥有的 task slots (可译为任务槽,至少一个)有关。

每个 task slot 可以认为是 TaskManager 的一份内存资源。例如,具有三个 slot 的 TaskManager 会将其管理的内存资源分成三等份,每个一份。划分资源可以避免子任务之间竞争资源,当然这也意味着它们拥有的资源大小是不可变的。不过 CPU 并没有隔离,只是平分了 woker 的内存资源。

用户可以通过调整 slot 的数量,调整子任务的隔离方式。若每个 TaskManager 只有一个 slot ,意味着每组任务占一个单独的 JVM 进程(例如,在一个单独的容器中启动)。如果一个 worker 有多个 slot ,则意味着多个子任务共享同一个 JVM。同一个 JVM 中的任务会共享 TCP 连接(通过多路复用技术)和心跳信息,还可以共享数据集和数据结构,从而降低整体开销。

默认情况下,Flink 允许来自同一个 job 的子任务共享 slot,即使它们是不同 task 的子任务。因此,一个 slot 可能会负责这个 job 的一整条路径(结合图1理解)。允许 slot 共享有两个好处:

Flink 集群需要的 slot 与 job 中使用的最高并行度恰好一样多。这样不需要计算程序总共包含多少个任务(任务可能具有多种并行度)。

资源利用率更高。slot 不共享时,简单的子任务(如:source/map())将会占用和复杂的子任务(如:window)一样多的资源。通过共享 slot,将示例中的并行度从 2 增加到 6 可以充分利用 slot 的资源,还可以确保繁重的子任务能在多个 TaskManagers 之间平均分配。

APIs 中包含了 resource group 机制,可以用来避免不必要的 slot 共享。

根据经验,slot 数量最好与 CPU 核数一致。使用超线程(hyper-threading)时,每个 slot 可以占用 2 个或更多的硬件线程。

Checkpoint

Flink 中的每个方法或算子都可以是有状态的。有状态的方法会在处理每个元素/事件的时候记录状态,从而使各种算子更加准确(因为可恢复)。Flink 使用 checkpoint(检查点)机制来保存状态。Checkpoint 能够恢复状态以及在数据流中的位置,从而保证无故障执行。

State Backends

不同类型的 state backend 会影响 key/values 索引存储时的数据结构。一种是将数据存储在基于内存的 HashMap 中,另一种会使用 RocksDB 存储。state backend 定义了数据结构的保存状态(state),定义了如何创建 key/values 的快照,并将该快照存储为 checkpoint 的一部分。

Savepoints

用 Data Stream API 编写的程序可以从 savepoint 恢复执行。Savepoints 可以保证在更新、升级代码和 Flink 集群配置时,不丢失任何状态。

Savepoints 可以理解为手动触发的 checkpoints,类似常规的 checkpoint 机制,它会对程序创建一个快照并将其保存到 state backend。程序会定期在 worker 上创建快照并生成 checkpoints。Flink 只需要最后一个完整的 checkpoint 来确保恢复,一旦创建好了新的 checkpoint,旧的就可以丢弃。

Savepoints 类似于 checkpoints,只不过是手动触发的,并且在新的 checkpoint 创建好后不会自动过期。你可以通过命令行来创建 Savepoints,或者在取消一个 job 时通过 REST API 来创建。

翻译自官方文档

5分钟了解算法开发的主要流程

发表于 2020-04-01 | 分类于 算法

推荐阅读时间:5分钟

简介

本文主要针对算法小白,用于了解算法开发的主要流程和术语。不当之处,还望包含。

主要流程如下:

  1. 问题抽象:确定问题类型。
    分类、回归、标注等
  2. 数据收集处理(特征工程)
    训练集、验证集、测试集
  3. 确定假设空间:找到合适的算法去进行预测
  4. 模型最优化:确定一个假设函数来确定假设空间的最优解
  5. 模型评估:判定模型的效果怎么样。
    若效果不好,则 1.换参数 2.换算法 3.特征工程进一步处理
  6. 模型预测:模型上线使用
    模型以API形式提供

基本术语

机器学习类型

监督学习与无监督学习

根据训练数据是否拥有标记信息,学习任务可分为监督学习(如分类、回归、标注)和无监督学习(如聚类)。

分类问题

若我们欲预测的是离散值,如:好瓜、坏瓜。此类学习任务称为分类。

回归问题

若欲预测的是连续值,如西瓜成熟度:0.91、0.65。此类学习任务称为回归。

标注问题

可以认为标注问题是分类问题的一个推广。

标注问题的输入是一个观测序列,输出的是一个标记序列或状态序列。也就是说,分类问题的输出是一个值,而标注问题输出是一个向量,向量的每个值属于一种标记类型。

聚类问题

通常,人们根据样本间的某种距离或者相似性来定义聚类,即把相似的(或距离近的)样本聚为同一类,而把不相似的(或距离远的)样本归在其他类。

聚类的目标:组内的对象相互之间是相似的(相关的),而不同组中的对象是不同的(不相关的)。组内的相似性越大,组间差别越大,聚类就越好。

数据集类型

训练集

作用:估计模型
学习样本数据集,通过匹配一些参数来建立一个分类器。建立一种分类的方式,主要是用来训练模型的。

验证集

作用:确定网络结构或者控制模型复杂程度的参数
对学习出来的模型,调整分类器的参数,如在神经网络中选择隐藏单元数。验证集还用来确定网络结构或者控制模型复杂程度的参数。

测试集

作用:检验最终选择最优的模型的性能如何
主要是测试训练好的模型的分辨能力(识别率等)

假设空间

假设空间与模型的关系

监督学习的目的在于学习一个由输入到输出的映射,这一映射由模型来表示。换句话说,学习的目的就在于找到最好的这样的模型。模型属于由输入空间到输出空间的映射的集合,这个集合就是假设空间。假设空间的确定意味着学习范围的确定。

参考:

《机器学习》周志华

《统计学习方法》李航

WebSocket 快速上手

发表于 2019-09-21

简介

本文介绍如何基于 SpringBoot 快速搭建 WebSocket 服务端和客户端。

WebSocket 使用场景

与 Http 协议相比,WebSocket 有两大优势:

  1. 支持服务端主动向客户端推送消息,不需要客户端进行轮询服务端。
  2. 节省网络带宽。维持一个长连接,客户端和服务端通信不需要频繁的建立连接。且互相沟通的Header非常小。

Quick Start

开始搞起

maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!--服务端依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.0.6.RELEASE</version> <!--版本自选-->
</dependency>

<!--客户端依赖-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.4.0</version> <!--版本自选-->
</dependency>

服务端代码

新建一个 SpringBoot 应用。添加 MyWebSocket 类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@ServerEndpoint("/myWebSocket/{id}")
public class MyWebSocket {

public static String heartBeat = "HeartBeat";
public static ConcurrentHashMap<Long, MyWebSocket> webSocketMap = new ConcurrentHashMap<>();

/**
* 与某个客户端的连接会话,需要通过它来与客户端进行数据收发
*/
private Session session;
private Long id;

public static int getOnlineCount() {
return webSocketMap.size();
}


@OnOpen
public void onOpen(Session session, @PathParam("id") Long id) {
log.info("Open a webSocket. id={}", id);
this.session = session;
this.id = id;
webSocketMap.put(id, this);
}

@OnClose
public void onClose() {
webSocketMap.remove(id);
log.info("Close a webSocket. id:{}", id);
}

@OnMessage
public void onMessage(String message, Session session) {
if (heartBeat.equals(message)) {
try {
session.getBasicRemote().sendText(message);
log.debug("Receive a heartBeat message from client: {}. id:{}", message, id);
return;
} catch (IOException e) {
log.error("回复心跳信息失败!", e);
}
}
log.info("Receive a message from client: {}. id:{}", message, id);
}

@OnError
public void onError(Session session, Throwable error) {
log.error("Error while webSocket. ", error);
}


public Session getSession() {
return session;
}

public Long getId() {
return id;
}

}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;

@Slf4j
public class TestWebSocketClient extends WebSocketClient {
public static String heartBeat = "HeartBeat";


public TestWebSocketClient(URI serverUri, Draft protocolDraft) {
super(serverUri, protocolDraft);
}

public static void main(String[] args) throws URISyntaxException, InterruptedException {
//端口号与 SpringBoot 的 servlet 容器端口一致
String serverUrl = "ws://localhost:9292/myWebSocket/2000000000462815";
URI recognizeUri = new URI(serverUrl);
TestWebSocketClient client = new TestWebSocketClient(recognizeUri, new Draft_6455());
client.connectBlocking(5, TimeUnit.SECONDS);
client.send("This is a message from client. ");
while (true) {
client.send(heartBeat);
Thread.sleep(2000);
}
}

@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("Open a WebSocket connection on client. ");
}

@Override
public void onClose(int code, String reason, boolean remote) {
log.info("Close a WebSocket connection on client. {} {} {}", code, reason, remote);
}

@Override
public void onMessage(String message) {
log.info("WebSocketClient receives a message: " + message);
}

@Override
public void onError(Exception exception) {
log.error("WebSocketClient exception. ", exception);
}

}

WebSocketConfig

用 SpringBoot 运行应用时,需要再添加一个配置文件,将 ServerEndpointExporter 注入 Bean 容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
* 用spring boot运行应用时,打开 @Configuration 注释;使用 war 部署在tomcat时,关闭注释
*
*/

@Configuration
@Slf4j
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

结语

搞定,好快~

使用taro开发微信小程序——入门

发表于 2019-08-13

简介

taro 是什么

Taro 是一套遵循 React 语法规范的 多端开发 解决方案。现如今市面上端的形态多种多样,Web、React-Native、微信小程序等各种端大行其道,当业务要求同时在不同的端都要求有所表现的时候,针对不同的端去编写多套代码的成本显然非常高,这时候只编写一套代码就能够适配到多端的能力就显得极为需要。

使用 Taro,我们可以只书写一套代码,再通过 Taro 的编译工具,将源代码分别编译出可以在不同端(微信/百度/支付宝/字节跳动/QQ小程序、快应用、H5、React-Native 等)运行的代码。

支持多端开发转化

Taro 方案的初心就是为了打造一个多端开发的解决方案。目前 Taro 代码可以支持转换到 微信/百度/支付宝/字节跳动/QQ小程序 、快应用、 H5 端 以及 移动端(React Native)。

quick start

安装 taro cli

1
2
3
4
5
6
# 使用 npm 安装 CLI
$ npm install -g @tarojs/cli
# OR 使用 yarn 安装 CLI
$ yarn global add @tarojs/cli
# OR 安装了 cnpm,使用 cnpm 安装 CLI
$ cnpm install -g @tarojs/cli

demo 工程

可以网上下载一个 taro 的 demo 工程,我们以 taro-library 为例。

1
2
3
4
5
6
7
8
9
10
$ git clone https://github.com/imageslr/taro-library.git

$ cd taro-library

$ npm install 或者 yarn

$ npm run dev:weapp

// 新建一个终端,在项目根目录下执行
$ gulp mock

需要注意的是,最好使工程的依赖与 cli 版本一致,否则运行时可能会出现一些奇怪的错误。

导入 微信开发者工具

执行完 npm run dev:weapp 之后,将会在项目的 dist 文件夹中生成相应的小程序工程。

安装好 微信开发者工具 之后启动,选择导入项目。目录选择 taro-library 的 dist 文件夹,AppID选择测试号。

调试

easy-mock

数据是在本机mock的,如果想要真机调试。可以使用我司的 easy-mock 来 mock 在线数据,使用方便。

Java7 ConcurrentHashMap 简介

发表于 2019-04-30 | 分类于 Java

推荐阅读时间:15分钟

简介

Java7 ConcurrentHashMap 是线程安全的 HashMap。与 HashTable 的区别是,支持多个线程并发访问,吞吐量高。
结构如下:

如图,ConcurrentHashMap 就是 Segment 数组,每个 Segment 可以理解为一个 HashMap。同一时间,每个 Segment 只允许一个线程访问。Segment 的数量在初始化后不再允许更改,但是每个 Segment 的长度是可以改变的。

Unsafe 类

Unsafe 类是 jdk 提供的一个类,这个类提供了一些绕开 JVM 的更底层功能,基于它的实现可以提高效率。但它是一把双刃剑:正如它的名字所讲,它是 Unsafe 的,它所分配的内存需要手动 free(不被 GC 回收)。
ConcurrentHashMap 中反复用到了其中的几个方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/***
* Sets the value of the object field at the specified offset in the
* supplied object to the given value. This is an ordered or lazy
* version of <code>putObjectVolatile(Object,long,Object)</code>, which
* doesn't guarantee the immediate visibility of the change to other
* threads. It is only really useful where the object field is
* <code>volatile</code>, and is thus expected to change unexpectedly.
* 设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者
* 有延迟的<code>putObjectVolatile</cdoe>方法,并且不保证值的改变被其他线程立
* 即看到。只有在field被<code>volatile</code>修饰并且期望被意外修改的时候
* 使用才有用。
*
* @param obj the object containing the field to modify.
* 包含需要修改field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中long型field的偏移量
* @param value the new value of the field.
* field将被设置的新值
*/
public native void putOrderedObject(Object obj, long offset, Object value);
1
2
3
4
5
6
7
8
9
10
11
/***
* Retrieves the value of the object field at the specified offset in the
* supplied object with volatile load semantics.
* 获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义。
*
* @param obj the object containing the field to read.
* 包含需要去读取的field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中object型field的偏移量
*/
public native Object getObjectVolatile(Object obj, long offset);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/***
* Compares the value of the object field at the specified offset
* in the supplied object with the given expected value, and updates
* it if they match. The operation of this method should be atomic,
* thus providing an uninterruptible way of updating an object field.
* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
* 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
*
* @param obj the object containing the field to modify.
* 包含要修改field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中object型field的偏移量
* @param expect the expected value of the field.
* 希望field中存在的值
* @param update the new value of the field if it equals <code>expect</code>.
* 如果期望值expect与field的当前值相同,设置filed的值为这个新值
* @return true if the field was changed.
* 如果field的值被更改
*/
public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);

初始化

见注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
//ssize 是不小于预设并发度且为2的幂数的最小值
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//segmentShift 和 segmentMask 是用于计算 Segment 位置的,int j =(hash >>> segmentShift) & segmentMask 。
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c 是每个 Segment 的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建 Segment 数组,并初始化 Segment[0]。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

get

很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
//u 是该 Segment 在数组中的位置
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//Segment 不为空且 Segment 的 HashEntry 数组不为空时,遍历链表找到值返回;否则返回 null 。
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

put

put 较复杂:

1
2
3
4
5
6
7
8
9
10
11
12
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
//如果要插入的 Segment 为 null,执行 ensureSegment ,初始化它。
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
//调用 Segment 的 put 方法
return s.put(key, hash, value, false);
}

ensureSegment 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//使用 Segment[0] 的容量和负载因子初始化该 Segment
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//使用 CAS 将该 Segment 初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

实际执行插入的 Segment 的 put 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//尝试获取一次锁,获取到往下走;否则执行 scanAndLockForPut 方法来获取锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
//获取到头节点,如果链表为空,插入该值直接返回 null;
//否则进行遍历,找到相应 key 则进行修改,返回旧值,没找到则在最后插入该值然后返回 null。
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

用于获取锁的 scanAndLockForPut:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//循环调用 tryLock() 尝试获取锁(自旋),当对某一元素的自旋次数超过一定次数时将被阻塞
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//自旋开始时,先遍历找到对应的元素
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//MAX_SCAN_RETRIES:最大加锁尝试次数,单核下为1;多核下为64。
//如果已经超过最大尝试次数则停止自旋,当前线程被阻塞,休眠一直到该锁可以获取。
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//如果 retries 为偶数且该表头元素已经更改则重新开始自旋
else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

扩容

实际执行扩容的是 Segment 的 rehash 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//容量翻倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//第一次遍历,找出最后面有哪些连续的元素扩容后会在相同的节点上
//根据概率来讲,平均后面 5/6 的数据都会在相同的节点上
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// 克隆分界节点(lastrun)之前的所有元素
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

isEmpty()、size()

isEmpty 与 size 两个方法与 HashMap 有些区别。因为它要遍历的对象是所有 Segment。直接对所有 Segment 加锁遍历十分影响性能,因此这两个方法用了共同的特殊技巧来实现。

isEmpty 方法会不加锁遍历两次,只要两次获取到的 modCount(修改次数)不一致或某个 Segment 的元素不为0,则认为不为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean isEmpty() {
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L)
return false;
}
return true;
}

size 方法也会先执行两次不加锁的遍历,若两次获取的 size 的大小一致则直接返回,否则加锁重新获取 size。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//前两次遍历获取的 size 的大小不一致,则加锁获取 size。
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
//如果加锁了,需要进行解锁
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
//如果 size 已超过32位,则取 Integer.MAX_VALUE,否则取实际的 size。
return overflow ? Integer.MAX_VALUE : size;
}

个人陋见难免疏漏,不足之处还请多多指教。😄

江湖路远,我们下期再会。

Sentinel 使用简介

发表于 2019-03-18

Sentinel 是什么

Sentinel 是面向分布式服务架构的轻量级 流量控制框架,可以从 流量控制、熔断降级、系统负载保护 等多个维度来保护服务的稳定性。

流量控制

可以理解为限制某种请求的最大 QPS 或它能同时发起的最大线程数。控制行为可以是 直接拒绝、排队等候、缓慢启动 等。

熔断降级

当某种请求的连续的几次请求都超时或表现出其他异常,影响到服务的健康时,可以在接下来的一段时间内拒绝相同的请求,以保护服务健康。

系统负载保护

从整体维度对应用入口流量进行控制,结合应用的 Load、总体平均 RT(RequestTime)、入口 QPS 和线程数等几个维度的监控指标,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

Sentinel 基本概念

资源

资源是 Sentinel 的关键概念。 它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。

规则

围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以 动态实时调整 。

Quick Start

1.添加 POM 依赖

Sentinel 的使用可以分为两个部分:

  • 核心库(Java 客户端):核心功能实现包。不依赖任何框架/库,能够运行于 Java 7 及以上的版本的运行时环境。
  • 控制台(Dashboard):Dashboard主要负责管理推送规则;监控;管理机器信息等。
1
2
3
4
5
6
7
8
9
10
11
12
<!-- 核心库 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.5.0</version>
</dependency>
<!-- 与控制台交互用 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.5.0</version>
</dependency>

2.定义资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
initFlowRules();
while (true) {
Entry entry = null;
try {
entry = SphU.entry("HelloWorld");
/*您的业务逻辑 - 开始*/
System.out.println("hello world");
/*您的业务逻辑 - 结束*/
} catch (BlockException e1) {
/*流控逻辑处理 - 开始*/
System.out.println("block!");
/*流控逻辑处理 - 结束*/
} finally {
if (entry != null) {
entry.exit();
}
}
}
}

3.定义规则

1
2
3
4
5
6
7
8
9
10
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}

启动该 demo 工程,参数添加 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=data-gateway

4.启动控制台观察结果

下载 alibaba/Sentinel 工程,本地启动 sentinel-dashboard 模块,运行参数添加 -Dserver.port=8080 。

更多 demo 可参考 Sentinel Examples

规则详情

流量控制规则 (FlowRule)

重要属性:
Field 说明 默认值
resource 资源名,资源名是限流规则的作用对象
count 限流阈值 QPS 模式
grade 限流阈值类型,QPS 或线程数模式
limitApp 流控针对的调用来源 default,代表不区分调用来源
strategy 判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口 根据资源本身
controlBehavior 流控效果(直接拒绝 / 排队等待 / 慢启动模式) 直接拒绝
demo:
1
2
3
4
5
6
7
8
9
10
private void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule(resourceName);
// set limit qps to 20
rule.setCount(20);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
rules.add(rule);
FlowRuleManager.loadRules(rules);
}

更多内容可参考流量控制

熔断降级规则 (DegradeRule)

重要属性:
Field 说明 默认值
resource 资源名,资源名是限流规则的作用对象
count 阈值,单位为 ms
timeWindow 降级的时间,单位为 s
grade 降级模式,根据 RT 降级还是根据异常比例降级 RT
demo:
1
2
3
4
5
6
7
8
9
10
11
private void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
// 若连续 5 个请求的响应时间均超过 300 ms ,将会被熔断 10S
rule.setCount(300);
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
}

更多内容可参考熔断降级

系统保护规则 (SystemRule)

重要属性:
Field 说明 默认值
highestSystemLoad 最大的 load1,参考值 -1 (不生效)
avgRt 所有入口流量的平均响应时间 -1 (不生效)
maxThread 入口流量的最大并发数 -1 (不生效)
qps 所有入口资源的 QPS -1 (不生效)
demo:
1
2
3
4
5
6
7
private void initSystemRule() {
List<SystemRule> rules = new ArrayList<>();
SystemRule rule = new SystemRule();
rule.setHighestSystemLoad(10);
rules.add(rule);
SystemRuleManager.loadRules(rules);
}

更多内容可参考系统自适应限流

授权规则 (AuthorityRule)

重要属性:
Field 说明 默认值
resource 资源名,资源名是限流规则的作用对象
limitApp 对应的黑名单/白名单,不同 origin 用 , 分隔,如 appA,appB
strategy 限制模式,AUTHORITY_WHITE 为白名单模式,AUTHORITY_BLACK 为黑名单模式 白名单
demo:
1
2
3
4
5
AuthorityRule rule = new AuthorityRule();
rule.setResource("test");
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
rule.setLimitApp("appA,appB");
AuthorityRuleManager.loadRules(Collections.singletonList(rule));

更多内容可参考黑白名单控制

热点规则 (ParamFlowRule)

重要属性:
Field 说明 默认值
resource 资源名,必填
count 限流阈值,必填
paramIdx 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置
paramFlowItemList 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型
grade 限流模式 QPS 模式
demo:
1
2
3
4
5
6
7
8
9
10
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(0)
.setCount(5);
// 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));

ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

更多内容可参考热点参数限流

volatile、CAS、synchronized、ReentrantLock 简介

发表于 2018-11-24 | 分类于 Java

推荐阅读时间:10分钟

简介

volatile、CAS、synchronized、ReentrantLock 都是多线程中需要理解的重要知识,本文把它们放一起对比下,做个简单的介绍,为后面分析concurrent包源码打好基础。
其中 volatile 和 CAS 是用来保证对变量的操作的线程安全性,synchronized 和 Lock 是用来保证多个操作的线程安全性。

一个实验

我们先通过一个小实验来简单了解下他们的使用方法和区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class AtomicLab {
private static final int LOOP_TIME = 500;
private static final Object LOCK = new Object();
private static Integer availableProcessors = Runtime.getRuntime().availableProcessors();
private static Lock lock = new ReentrantLock();

private static Integer i0 = 0;
private static volatile Integer i1 = 0;
private static Integer i2 = 0;
private static AtomicInteger i3 = new AtomicInteger();
private static Integer i4 = 0;

public static void main(String[] args) throws InterruptedException {

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("jasonLab-%d").build();
ExecutorService service = new ThreadPoolExecutor(availableProcessors + 1, availableProcessors * 2,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000), threadFactory);
for (int i = 0; i < 100; i++) {
service.execute(new TestThread0());
service.execute(new TestThread1());
service.execute(new TestThread2());
service.execute(new TestThread3());
service.execute(new TestThread4());
}
Thread.sleep(1000L);
System.out.println("i0 result is " + i0 + " , equal 50000 : " + (i0 == 50000));
System.out.println("i1 result is " + i1 + " , equal 50000 : " + (i1 == 50000));
System.out.println("i2 result is " + i2 + " , equal 50000 : " + (i2 == 50000));
System.out.println("i3 result is " + i3 + " , equal 50000 : " + (i3.get() == 50000));
System.out.println("i4 result is " + i4 + " , equal 50000 : " + (i4 == 50000));

service.shutdown();
}

static class TestThread0 implements Runnable {

@Override
public void run() {
for (int i = 0; i < LOOP_TIME; i++) {
i0 += 1;
}
}
}

static class TestThread1 implements Runnable {

@Override
public void run() {
for (int i = 0; i < LOOP_TIME; i++) {
i1++;
}
}
}

static class TestThread2 implements Runnable {

@Override
public void run() {
for (int i = 0; i < LOOP_TIME; i++) {
synchronized (LOCK) {
i2 += 1;
}
}
}
}

static class TestThread3 implements Runnable {

@Override
public void run() {
for (int i = 0; i < LOOP_TIME; i++) {
i3.getAndAdd(1);
}
}
}

static class TestThread4 implements Runnable {

@Override
public void run() {
lock.lock();
try {
for (int i = 0; i < LOOP_TIME; i++) {
i4++;
}
} finally {
lock.unlock();
}

}
}
}

运行上述 main 方法,一个可能的结果如下:

1
2
3
4
5
i0 result is 48646 , equal 50000 : false
i1 result is 48509 , equal 50000 : false
i2 result is 50000 , equal 50000 : true
i3 result is 50000 , equal 50000 : true
i4 result is 50000 , equal 50000 : true

上述实验是计算 100 个线程同时对同一个 i 进行i++操作的累加结果。
我们知道,i++操作其实分:读(getI())、改(i=i+1)、写(setI(i))三步进行的。
对于 i0,这三个操作都不具备原子性保证,所以多线程下难免会发生数据丢失的问题。而至于i1-i4,其实分别用到了标题中的四个知识点,我们依次介绍下它们。

volatile

i1 被 volatile 修饰,它是 Java 中的关键字,它修饰的变量具有可见性和原子性的特点。

可见性和原子性

可见性:如果一个变量具有可见性,可以理解为任意时刻得到的都是该变量的最新值。
原子性:指对该变量的操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其它线程干扰。

volatile 实现原理

volatile 修饰的变量在进行操作时,会在汇编代码中加上 Lock 前缀,这将导致两件事情:

  1. 所有处理器不会在本地内存中记录该变量,而是直接写到共享内存中。
  2. 所有处理器在读取该变量时,都直接从共享内存中读取。

结果分析

根据实现原理,我们可以得知:对 volatile 变量的读或写都可以保证原子性。也就是上面的第一步和第三步是原子性的操作,但是第二步修改操作时却不能保证。
当一个线程执行修改操作时,其他线程可能已经执行过写入操作了,所以当该线程执行写入操作时,就覆盖了前面的写入操作,导致数据丢失。

CAS

我们先看下 i3,可以看到它使用了原子更新整型:AtomicInteger,我们在进行累加时,使用了它的getAndAdd()方法。
这个方法其实最终调用了Unsafe.compareAndSwapInt()方法,这是个 native 方法,依赖 CAS(CompareAndSwap)原理实现。

CAS 实现原理

CAS 的实现使用了处理器提供的 CMPXCHG指令,这个指令也带有Lock前缀,在进行 CAS 操作时,会锁住相应的内存区域,其他不能操作相应内存区域的线程在外面循环进行尝试,实现多线程原子性。
进行 CAS 时,需要对三个值进行操作:现在的值、预期的值、要替换的值。只有当预期的值和当前值一致时,才会进行修改。

ABA问题

CAS 操作可能会出现这样的问题:变量的值原来是A,被其他线程修改为了B,后来又被修改回A,当该线程进行CAS操作时,发现预期值与当前值一致,进行了修改。而其实变量已经被修改过了,这样就可能会导致其他的问题。JDK1.5开始,提供了AtomicStampedReference类来解决这个问题,变量会加一个类似乐观锁的版本号:1A-2B-3A。这样就可以准确的判断变量是否被修改过了。

结果分析

根据原理,我们可以得知针对 i3 的每次修改都是原子性的,没啥好说的~
synchronized 和 ReentrantLock 也不再进行结果分析。

延伸

volatile 和 CAS 在 Java 中举足轻重。借一张图表示 Java concurrent 包的实现。

synchronized

synchronized 是 Java 提供的一个关键字,用来锁住一个对象,被锁的对象任意时刻只能被一个线程访问(同一个线程可以加多个锁进行重复访问)。
synchronized 修饰不同的地方,加的锁的类型也不一样:

  1. 修饰非静态方法,锁的是该方法所在的实例对象。
  2. 修饰静态方法,锁的是该类的类对象。
  3. 修饰代码块时,锁的是所指定的对象。

实现原理

任何一个对象都有一个 monitor 与之关联,当 monitor 被持有后,它就将处于锁定状态。synchronized 就是通过获取和释放 monitor 实现的。

锁状态

大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低,Java6 开始,引入了偏向锁和轻量级锁的概念。

偏向锁

获取到锁后,锁默认处于偏向锁状态,在锁对象的对象头中储存一个线程ID,当下次该线程尝试获取该锁时,不需要进行循环CAS取锁,只需要检测偏向锁的线程ID是否与之一致即可。
当多个线程对同一个锁竞争激烈时,偏向锁会升级为轻量级锁。

轻量级锁

加锁:
线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的 Mark Word 复制到锁记录中,官方称为 Displaced Mark Word。
然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。
解锁:
轻量级解锁时,会使用原子的CAS操作将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。

轻量级锁能提高程序同步性能的依据是“对于绝大部分的锁,在整个同步周期内都是不存在竞争的”,这是一个经验数据。如果没有竞争,轻量级锁使用CAS操作避免了使用互斥量的开销,
但如果存在锁竞争,除了互斥量的开销外,还额外发生了CAS操作,因此在有竞争的情况下,轻量级锁会比传统的重量级锁更慢。

ReentrantLock

ReentrantLock 实现了 Lock 接口,也是 JDK 中该接口的唯一实现。Lock 接口是在Java5新增的,提供了与 synchronized 相似的功能。

与 synchronized 的区别

  1. ReentrantLock可以显示的进行加锁和解锁。
  2. ReentrantLock可中断的获取锁。
  3. ReentrantLock可以提供公平锁。
  4. ReentrantLock可以提供超时等待机制。

实现原理

ReentrantLock 的实现依赖于 AbstractQueueSynchronizer(AQS),它是实现锁或其他同步组件的基础框架。
AQS 内部维护了一个同步状态变量和一个同步队列,获取到该同步状态的线程视为获取到锁;获取失败的线程连同它的等待状态信息会被构造成加入到同步队列中,并阻塞它。
当同步状态被释放时,同步队列中的首节点会被唤醒尝试去获取同步状态。

读写锁

如果一段代码中大部分时间都在执行读操作,多个读操作同时进行不会影响线程安全性,这时前面提到的独占锁明显会影响多线程的读取性能。
ReentrantReadWriteLock是一个读写锁,多个获取了读锁之间的线程可以同步执行;而写锁不可以和读/写锁同步执行。
读写锁锁降级:一个线程在获取了写锁后,有获取了读锁,在释放写锁后,就变成了只获取了读锁,即锁降级。

Condition

ReentrantLock 使用 Condition 的 await()、signal()、signalAll()方法分别代替 Object 的wait()、notify()、notifyAll() 方法。

结语

以上是关于这四者的简单介绍,为了后面的系列内容做下铺垫,想要了解详情可以参考更多书籍、资料。


Java8 ArrayList 源码解读

发表于 2018-11-12 | 分类于 Java

推荐阅读时间:10分钟

简介

ArrayList 是日常开发中很常见的集合类型,在 Java 集合中相对容易阅读。它是基于数组实现的一种列表,读取、修改的时间复杂度很小(O(1)), 插入、remove 时时间复杂度为O(n)。ArrayList 可以存放 null 值,列表清空就是通过把所有的元素置为 null 实现的。

Arrays.copyOf() 和 System.arraycopy()

首先我们先看下代码里反复出现的两个方法:Arrays.copyOf() 和 System.arraycopy()。其实 public static <T> T[] copyOf(T[] original, int newLength) 是通过调用后者实现的,输入待拷贝的数组和要返回数组的长度,拷贝出一个新的数组。而public static native void arraycopy(Object src, int srcPos, Object dest, int destPos,int length)是真正执行拷贝的方法。它是个 native 方法,五个参数分别代表 待拷贝数组、待拷贝数组的起始位置、目标数组、目标数组的插入位置、拷贝的长度。每次拷贝都是全量拷贝,因此容量变化的操作较多时,会对它造成性能影响。

RandomAccess

ArrayList 实现了 RandomAccess 接口表示支持快速随机访问,将使用 for 循环查找元素。如果没有实现该接口(如 LinkedList),在查找时,只能通过 迭代器 进行查找,查找速度要低于前者。

Java doc 中具体解释如下:

1
2
3
4
5
6
7
8
9
* <pre>
* for (int i=0, n=list.size(); i &lt; n; i++)
* list.get(i);
* </pre>
* runs faster than this loop:
* <pre>
* for (Iterator i=list.iterator(); i.hasNext(); )
* i.next();
* </pre>

⭐注释版源码⭐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685

public class ArrayList<E> extends AbstractList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
private static final long serialVersionUID = 8683452581122892189L;

/**
* 默认的初始化容量:10
*/
private static final int DEFAULT_CAPACITY = 10;

/**
* Object[],用来存储列表元素
*/
transient Object[] elementData; // non-private to simplify nested class access
/**
* 按默认容量创建的空列表共享的存储对象
*/
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};
/**
* 指定容量创建的空列表共享的存储对象
*/
private static final Object[]
EMPTY_ELEMENTDATA = {};
/**
* 列表中元素数量
*/
private int size;

/**
* 构造函数,指定容量大小
*/
public ArrayList(int initialCapacity) {
if (initialCapacity > 0) {
this.elementData = new Object[initialCapacity];
} else if (initialCapacity == 0) {
//如果指定容量为0,使用 EMPTY_ELEMENTDATA
this.elementData = EMPTY_ELEMENTDATA;
} else {
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
}
}

/**
* 无参构造函数
*/
public ArrayList() {
//使用 DEFAULTCAPACITY_EMPTY_ELEMENTDATA
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}

/**
* 构造函数,入参为一个集合类型
*/
public ArrayList(Collection<? extends E> c) {
//调用 Collection 的 toArray() 方法,转换为 Object[]
elementData = c.toArray();
if ((size = elementData.length) != 0) {
// c.toArray 方法可能会转换出错,导致生成 Object[] 失败
if (elementData.getClass() != Object[].class)
elementData = Arrays.copyOf(elementData, size, Object[].class);
} else {
// replace with empty array.
this.elementData = EMPTY_ELEMENTDATA;
}
}

/**
* 按实际元素数量重新申请存储空间以减少内存使用
*/
public void trimToSize() {
//修改次数。记录该 ArrayList 对象修改次数,防止并发执行修改操作导致数据不一致。
modCount++;
if (size < elementData.length) {
elementData = (size == 0)
? EMPTY_ELEMENTDATA
: Arrays.copyOf(elementData, size);
}
}

/**
* 确保容量足够,如果容量不够就扩容
*/
public void ensureCapacity(int minCapacity) {
//如果是按默认容量创建的空 ArrayList,则当指定的容量超过10时,才会扩容
int minExpand = (elementData != DEFAULTCAPACITY_EMPTY_ELEMENTDATA)
// any size if not default element table
? 0
// larger than default for default empty table. It's already supposed to be at default size.
: DEFAULT_CAPACITY;

if (minCapacity > minExpand) {
ensureExplicitCapacity(minCapacity);
}
}

//计算最小容量
private static int calculateCapacity(Object[] elementData, int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
return Math.max(DEFAULT_CAPACITY, minCapacity);
}
return minCapacity;
}

//计算出需要扩容的最小容量然后确保增加到该容量
private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}

//确保增加到指定的容量
private void ensureExplicitCapacity(int minCapacity) {
modCount++;

// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}

/**
* 元素最大数量。因为有的虚拟机预留了用于保存数组对象大小等信息的元数据,故减去了8位。
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* 实际执行 扩容方法
*/
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
//扩容到原来的 1.5倍
int newCapacity = oldCapacity + (oldCapacity >> 1);
//如果扩容后仍小于要最小容量则直接取最小容量作为新的容量
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
//如果扩容后大于最大允许的容量,则执行 hugeCapacity
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}

/**
* 求扩容容量,如果实际指定的最小容量超过 MAX_ARRAY_SIZE ,
* 则取 Integer.MAX_VALUE。否则取 MAX_ARRAY_SIZE。
*/
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}

/**
* 返回实际元素个数
*/
public int size() {
return size;
}

/**
* 是否为空
*/
public boolean isEmpty() {
return size == 0;
}

/**
* 是否包含一个元素
*/
public boolean contains(Object o) {
return indexOf(o) >= 0;
}

/**
* 求指定元素首次出现的下标。-1 表示不存在
*/
public int indexOf(Object o) {
//单独考虑所查元素为 null 时的情况
if (o == null) {
for (int i = 0; i < size; i++)
if (elementData[i]==null)
return i;
} else {
for (int i = 0; i < size; i++)
if (o.equals(elementData[i]))
return i;
}
return -1;
}

/**
* 指定元素最后一次出现的下标。靠从后面遍历来实现的。
*/
public int lastIndexOf(Object o) {
if (o == null) {
for (int i = size-1; i >= 0; i--)
if (elementData[i]==null)
return i;
} else {
for (int i = size-1; i >= 0; i--)
if (o.equals(elementData[i]))
return i;
}
return -1;
}

/**
* 浅拷贝,只拷贝一个新的数组,元素未拷贝。
*/
public Object clone() {
try {
ArrayList<?> v = (ArrayList<?>) super.clone();
v.elementData = Arrays.copyOf(elementData, size);
v.modCount = 0;
return v;
} catch (CloneNotSupportedException e) {
// this shouldn't happen, since we are Cloneable
throw new InternalError(e);
}
}

/**
* 转换为 Object[]
*/
public Object[] toArray() {
return Arrays.copyOf(elementData, size);
}

/**
* 转换为指定类型数组。
* 注意:若指定数组类型不是列表元素的超类,则会报 ArrayStoreException 异常。
* 若指定数组为 null ,会报空指针异常。
*/

@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
if (a.length < size)
// Make a new array of a's runtime type, but my contents:
return (T[]) Arrays.copyOf(elementData, size, a.getClass());
System.arraycopy(elementData, 0, a, 0, size);
if (a.length > size)
a[size] = null;
return a;
}

// Positional Access Operations

@SuppressWarnings("unchecked")
E elementData(int index) {
return (E) elementData[index];
}

/**
* get
*/
public E get(int index) {
//检测下标是否越界
rangeCheck(index);

return elementData(index);
}

/**
* set
*/
public E set(int index, E element) {
rangeCheck(index);

E oldValue = elementData(index);
elementData[index] = element;
return oldValue;
}

/**
* add
*/
public boolean add(E e) {
//先判断容量是否足够
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

public void add(int index, E element) {
rangeCheckForAdd(index);

ensureCapacityInternal(size + 1); // Increments modCount!!
System.arraycopy(elementData, index, elementData, index + 1,
size - index);
elementData[index] = element;
size++;
}

/**
* 根据下标删除
*/
public E remove(int index) {
rangeCheck(index);

modCount++;
E oldValue = elementData(index);
//需要移动的元素数量
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
//将数组最后一个元素设为 null,通过GC机制自动去回收空间
elementData[--size] = null; // clear to let GC do its work

return oldValue;
}

/**
* 根据元素删除
*/
public boolean remove(Object o) {
if (o == null) {
for (int index = 0; index < size; index++)
if (elementData[index] == null) {
fastRemove(index);
return true;
}
} else {
for (int index = 0; index < size; index++)
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
return false;
}

/**
* 与 remove(int index) 方法基本一致,只是没有下标越界检查、不返回旧值。
* 提升删除性能,作为私有方法。
*/
private void fastRemove(int index) {
modCount++;
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--size] = null; // clear to let GC do its work
}

/**
* 所有元素全部置为 null
*/
public void clear() {
modCount++;

// clear to let GC do its work
for (int i = 0; i < size; i++)
elementData[i] = null;

size = 0;
}

/**
* 追加一个集合的元素
*/
public boolean addAll(Collection<? extends E> c) {
Object[] a = c.toArray();
int numNew = a.length;
ensureCapacityInternal(size + numNew); // Increments modCount
System.arraycopy(a, 0, elementData, size, numNew);
size += numNew;
return numNew != 0;
}

/**
* 插入一个集合的元素
*/
public boolean addAll(int index, Collection<? extends E> c) {
rangeCheckForAdd(index);

Object[] a = c.toArray();
int numNew = a.length;
ensureCapacityInternal(size + numNew); // Increments modCount

int numMoved = size - index;
if (numMoved > 0)
System.arraycopy(elementData, index, elementData, index + numNew,
numMoved);

System.arraycopy(a, 0, elementData, index, numNew);
size += numNew;
return numNew != 0;
}

/**
* 删除指定下标范围的元素
*/
protected void removeRange(int fromIndex, int toIndex) {
modCount++;
int numMoved = size - toIndex;
System.arraycopy(elementData, toIndex, elementData, fromIndex,
numMoved);

// clear to let GC do its work
int newSize = size - (toIndex-fromIndex);
for (int i = newSize; i < size; i++) {
elementData[i] = null;
}
size = newSize;
}

/**
* 检测下标是否越界
*/
private void rangeCheck(int index) {
if (index >= size)
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

/**
* add 和 addAll 方法中 检测下标是否越界。
*/
private void rangeCheckForAdd(int index) {
if (index > size || index < 0)
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

private String outOfBoundsMsg(int index) {
return "Index: "+index+", Size: "+size;
}

/**
* 删除在集合中出现的元素
* 可能会报 ClassCastException 和 空指针异常
*/
public boolean removeAll(Collection<?> c) {
Objects.requireNonNull(c);
return batchRemove(c, false);
}

/**
* 保留在集合中存在的元素
* 可能会报 ClassCastException 和 空指针异常
*/
public boolean retainAll(Collection<?> c) {
Objects.requireNonNull(c);
return batchRemove(c, true);
}

private boolean batchRemove(Collection<?> c, boolean complement) {
final Object[] elementData = this.elementData;
int r = 0, w = 0;
boolean modified = false;
try {
//遍历列表,根据 complement,选择是否保留元素
for (; r < size; r++)
if (c.contains(elementData[r]) == complement)
elementData[w++] = elementData[r];
} finally {
//如果遍历过程中报错了,将剩余未遍历的元素追加到不完整的新列表的后面
if (r != size) {
System.arraycopy(elementData, r,
elementData, w,
size - r);
w += size - r;
}
//如果列表有被修改,则将无效存储位置置为 null
if (w != size) {
// clear to let GC do its work
for (int i = w; i < size; i++)
elementData[i] = null;
modCount += size - w;
size = w;
modified = true;
}
}
//返回是否做了修改
return modified;
}

/**
* 对象序列化函数
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException{
// Write out element count, and any hidden stuff
int expectedModCount = modCount;
//执行默认的序列化函数,将除 elementData[] 外的属性序列化
s.defaultWriteObject();

// Write out size as capacity for behavioural compatibility with clone()
// 写入 size
s.writeInt(size);

// Write out all elements in the proper order.
// 将 elementData[] 中的元素序列化进去
for (int i=0; i<size; i++) {
s.writeObject(elementData[i]);
}
//序列化过程中对象被修改,则报 ConcurrentModificationException 异常
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
}

/**
* 反序列化函数
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
elementData = EMPTY_ELEMENTDATA;

// Read in size, and any hidden stuff
// 反序列化出除 elementData[] 外的属性
s.defaultReadObject();

// Read in capacity
// 读出 size ,可忽略
s.readInt(); // ignored

if (size > 0) {
// be like clone(), allocate array based upon size not capacity
// 计算出实际容量
int capacity = calculateCapacity(elementData, size);
// 检查是否转换为数组类型,容量是否小于0。此处实际上不需要第一个参数。
SharedSecrets.getJavaOISAccess().checkArray(s, Object[].class, capacity);
ensureCapacityInternal(size);


Object[] a = elementData;
// Read in all elements in the proper order.
// 执行反序列化
for (int i=0; i<size; i++) {
a[i] = s.readObject();
}
}
}

/**
* 返回一个迭代器 ListIterator
*/
public ListIterator<E> listIterator() {
return new ListItr(0);
}

/**
* 返回一个迭代器 ListIterator,指定初始迭代位置
*/
public ListIterator<E> listIterator(int index) {
if (index < 0 || index > size)
throw new IndexOutOfBoundsException("Index: "+index);
return new ListItr(index);
}

/**
* 返回一个迭代器 Iterator
*/
public Iterator<E> iterator() {
return new Itr();
}


/**
* 获取子list
*/
public List<E> subList(int fromIndex, int toIndex) {
subListRangeCheck(fromIndex, toIndex, size);
return new SubList(this, 0, fromIndex, toIndex);
}

//子list范围检查
static void subListRangeCheck(int fromIndex, int toIndex, int size) {
if (fromIndex < 0)
throw new IndexOutOfBoundsException("fromIndex = " + fromIndex);
if (toIndex > size)
throw new IndexOutOfBoundsException("toIndex = " + toIndex);
if (fromIndex > toIndex)
throw new IllegalArgumentException("fromIndex(" + fromIndex +
") > toIndex(" + toIndex + ")");
}


// Java8 新加,供函数式编程遍历
@Override
public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
final int expectedModCount = modCount;
@SuppressWarnings("unchecked")
final E[] elementData = (E[]) this.elementData;
final int size = this.size;
for (int i=0; modCount == expectedModCount && i < size; i++) {
action.accept(elementData[i]);
}
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
}

/**
* 返回一个 Spliterator 对象。
* Spliterator 是 Java8 新加的、可分割的迭代器(splitable iterator),对于并行处理的能力大大增强。
*/
@Override
public Spliterator<E> spliterator() {
return new ArrayListSpliterator<>(this, 0, -1, 0);
}

/**
* Predicate 是Java8 新加的类,可以理解为一个用来断言的类。
* 该方法应该是为了函数式编程新加的。
*/
@Override
public boolean removeIf(Predicate<? super E> filter) {
Objects.requireNonNull(filter);
// figure out which elements are to be removed
// any exception thrown from the filter predicate at this stage
// will leave the collection unmodified
int removeCount = 0;
final BitSet removeSet = new BitSet(size);
final int expectedModCount = modCount;
final int size = this.size;
for (int i=0; modCount == expectedModCount && i < size; i++) {
@SuppressWarnings("unchecked")
final E element = (E) elementData[i];
if (filter.test(element)) {
removeSet.set(i);
removeCount++;
}
}
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}

// shift surviving elements left over the spaces left by removed elements
final boolean anyToRemove = removeCount > 0;
if (anyToRemove) {
final int newSize = size - removeCount;
for (int i=0, j=0; (i < size) && (j < newSize); i++, j++) {
i = removeSet.nextClearBit(i);
elementData[j] = elementData[i];
}
for (int k=newSize; k < size; k++) {
elementData[k] = null; // Let gc do its work
}
this.size = newSize;
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
modCount++;
}

return anyToRemove;
}

/**
* Java8 新加,可以按指定规则替换所有元素。
* UnaryOperator 实现了 Function 接口,可以接收一个入参,处理后返回。
*/
@Override
@SuppressWarnings("unchecked")
public void replaceAll(UnaryOperator<E> operator) {
Objects.requireNonNull(operator);
final int expectedModCount = modCount;
final int size = this.size;
for (int i=0; modCount == expectedModCount && i < size; i++) {
elementData[i] = operator.apply((E) elementData[i]);
}
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
modCount++;
}

/**
* 排序,Comparator 指定排序规则
*/
@Override
@SuppressWarnings("unchecked")
public void sort(Comparator<? super E> c) {
final int expectedModCount = modCount;
//调用 Arrays 的排序方法,瞄了一眼,很复杂的样子...
Arrays.sort((E[]) elementData, 0, size, c);
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
modCount++;
}
}

writeObject 和 readObject

ArrayList 实现了 Serializable 接口,所以对象会被序列化。而存放元素的 elementData 中可能会存在元素数量比数组容量小很多的情况,序列化时就会造成大量的空间浪费,因此通过实现 writeObject 和 readObject 方法,即可重新定义序列化与反序列化的规则。ArrayList 在 elementData 前加上了 transient 取消其默认序列化规则,其他属性则执行默认的规则。

迭代器

iterator()方法会返回一个 Iterator 迭代器,遍历时较常见。
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* AbstractList.Itr 的优化版 迭代器
*/
private class Itr implements Iterator<E> {
// 当前下标
int cursor; // index of next element to return
// 上一个元素的下标,-1 表示还没有上一个元素
int lastRet = -1; // index of last element returned; -1 if no such
int expectedModCount = modCount;

Itr() {}

public boolean hasNext() {
return cursor != size;
}

@SuppressWarnings("unchecked")
public E next() {
checkForComodification();
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}

public void remove() {
//
if (lastRet < 0)
throw new IllegalStateException();
checkForComodification();

try {
ArrayList.this.remove(lastRet);
cursor = lastRet;
lastRet = -1;
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}
//Java8 新加的 遍历方法,供函数式编程使用
@Override
@SuppressWarnings("unchecked")
public void forEachRemaining(Consumer<? super E> consumer) {
Objects.requireNonNull(consumer);
final int size = ArrayList.this.size;
int i = cursor;
if (i >= size) {
return;
}
final Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length) {
throw new ConcurrentModificationException();
}
while (i != size && modCount == expectedModCount) {
consumer.accept((E) elementData[i++]);
}
// update once at end of iteration to reduce heap write traffic
cursor = i;
lastRet = i - 1;
checkForComodification();
}

// 检查是否被修改过
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}

listIterator(int index)、listIterator()这两个方法返回的是 ListIterator 迭代器,与 Iterator 相比,它支持反向遍历和 add() 方法,比较容易理解,不再赘述。

此外,还有一个 spliterator()方法,它返回的是一个 Java8 新加的 Spliterator 迭代器。Spliterator 是一个可分割迭代器(splitable iterator),为了并行遍历元素而设计。如果有机会我们再分析它。

sublist

ArrayList 提供的public List<E> subList(int fromIndex, int toIndex)方法允许返回一个子list。
根据注释得知:

  1. 该方法返回的是父list的一个视图,从fromIndex(包含),到toIndex(不包含)。fromIndex=toIndex 表示子list为空
  2. 父子list做的非结构性修改(non-structural changes)都会影响到彼此:所谓的“非结构性修改”,是指不涉及到list的大小改变的修改。相反,结构性修改,指改变了list大小的修改。
  3. 对于结构性修改,子list的所有操作都会反映到父list上。但父list的修改将会导致返回的子list失效。
  4. tips:删除list中的某段数据的方法:list.subList(from, to).clear();

觉得有点收获的同学可以在手机上点击这个链接 免费领取一杯咖啡(瑞幸咖啡券,使用后我也得一张😃)

12…4
jason song

jason song

创造美好!

32 日志
10 分类
45 标签
RSS
Links
  • 阿里中间件BLOG
  • Flink China
© 2020 jason song
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4