0707.md

1.内网穿透ngroK
2.

单一应用架构

  • 当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。
  • 此时,用于简化增删改查工作量的 数据访问框架(ORM) 是关键。

垂直应用架构

  • 当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率。
  • 此时,用于加速前端页面开发的 Web框架(MVC) 是关键。

分布式服务架构

  • 当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。
  • 此时,用于提高业务复用及整合的 分布式服务框架(RPC) 是关键。

流动计算架构:

  • 当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。
  • 此时,用于提高机器利用率的 资源调度和治理中心(SOA) 是关键。

微服务架构:随着敏捷开发、持续交付、DevOps理论的发展和实践,以及基于Docker等轻量级容器(LXC)部署应用和服务的成熟,微服务架构开始流行,逐渐成为应用架构的未来演进方向。通过服务的原子化拆分,以及微服务的独立打包、部署和升级,小团队敏捷缴费,应用的交付周期将缩短,运维成本也将大幅下降。

3.spring声明式事务和编程式事务
spring事务注入方式:
https://blog.csdn.net/yaerfeng/article/details/28390773

4.spring事务传播行为:
| 事务传播行为类型 | 说明
-|-|-
PROPAGATION_REQUIRED | 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。 |
PROPAGATION_SUPPORTS | 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY| 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW | 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED | 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER| 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED | 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

5.5种io模型
Stevens在文章中一共比较了五种IO Model:

* blocking IO  阻塞io
* nonblocking IO  费阻塞io
* IO multiplexing  io多路复用
* signal driven IO
* asynchronous IO   异步io
* [https://www.cnblogs.com/findumars/p/6361627.html](https://www.cnblogs.com/findumars/p/6361627.html)

6.pmml

Collectors.toMap的问题

概述

  虽然JDK9.0已经出来了,不过我们系统最近才开始全面引入JDK1.8,JDK1.8也已经出来了好久了,各方面都挺稳定的。最近在使用lambda表达式的Collectors.toMap方法时就遇到了一个问题。
大致源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Test {
public static void main(String[] args) {
// initMemberList为获取数据的方法
List<Member> list = Test.initMemberList();
Map<String, String> memberMap = list.stream().collect(Collectors.toMap(Member::getId, Member::getImgPath));
System.out.println(memberMap);
}
}

class Member {
private String id;
private String imgPath;

// get set省略
}

运行程序,直接提示:

1
2
3
4
5
6
7
8
9
10
11
Exception in thread "main" java.lang.NullPointerException
at java.util.HashMap.merge(HashMap.java:1224)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.jdk.test.Test.main(Test.java:13)

 想来想去,一开始一直没想明白为什么会为空指针呢,因为list是不可能为null的,无奈后来拿着java.util.HashMap.merge加NullPointerException异常上网搜索了一下。原来是由于在由对象转为Map的时候,转为map的value是null导致的。大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static List<Member> initMemberList() {
Member member1 = new Member();
member1.setId("id_1");
member1.setImgPath("http://www.baidu.com");
// 这里有一个null导致的
Member member2 = new Member();
member2.setId("id_2");
member2.setImgPath(null);

List<Member> list = new ArrayList<>();
list.add(member1);
list.add(member2);
return list;
}

大致看了下源码,原来Collectors.toMap底层是基于Map.merge方法来实现的,而merge中value是不能为null的,如果为null,就会抛出空指针异常,原来问题是这样的。

1
Collectors.toMap() internally uses Map.merge() to add mappings to the map. Map.merge() is spec'd not to allow null values, regardless of whether the underlying Map supports null values. This could probably use some clarification in the Collectors.toMap() specifications.

看了下,在openJDK的bug列表里还有这个呢:JDK-8148463,不知道这到底算不算bug呢。地址:Collectors.toMap fails on null values

问题归问题,我们还是需要通过其他的方式解决的。

解决方式1

原来for循环的方式,亦或是forEach的方式:

1
2
3
4
5
6
7
8
Map<String, String> memberMap = new HashMap<>();
list.forEach((answer) -> memberMap.put(answer.getId(), answer.getImgPath()));
System.out.println(memberMap);

Map<String, String> memberMap = new HashMap<>();
for (Member member : list) {
memberMap.put(member.getId(), member.getImgPath());
}
解决方式2

使用stream的collect的重载方法:

1
2
3
Map<String, String> memberMap = list.stream().collect(HashMap::new, (m,v)->
m.put(v.getId(), v.getImgPath()),HashMap::putAll);
System.out.println(memberMap);
解决方式3
1
> 继承Collector,手动实现toMap方法,然后调用我们自己封装的toMap方法就可以了。有关实现Collector,可参考:[JDK8 Stream API中Collectors中toMap方法的问题以及解决方案](http://blog.jobbole.com/104067/)

  其实不管这是不是bug,说到底,还是JDK1.8的lambda表达式用的太少,了解的太少导致的问题。所以说还是应该多去使用新技术,多踩坑。

stackoverflow地址:# Java 8 NullPointerException in Collectors.toMap

后记

使用Collector.toMap又发现了一个问题,Map中的key不能重复,如果重复的话,会抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static List<Member> initMemberList() {
Member member1 = new Member();
member1.setId("id_1");
member1.setImgPath("http://www.google.com");

Member member2 = new Member();
member2.setId("id_1");
member2.setImgPath("http://www.baidu.com");

List<Member> list = new ArrayList<>();
list.add(member1);
list.add(member2);
return list;
}

以上代码,运行时,提示错误:

1
2
3
4
5
6
7
8
9
10
11
12
Exception in thread "main" java.lang.IllegalStateException: Duplicate key http://www.google.com
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1253)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.jdk.test.Test.main(Test.java:13)

通过查看Collectors.toMap的代码及注释我们会发现:

1
2
3
4
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
}
1
> If the mapped keys contains duplicates (according to Object#equals(Object)), an IllegalStateException is thrown when the collection operation is performed. If the mapped keys may have duplicates, use toMap(Function, Function, BinaryOperator) instead.

  所以说呢,我们使用的Collectors.toMap的方法是不支持key重复的,并且如果有重复的时候,建议我们使用toMap(Function, Function, BinaryOperator) 方法来替换使用,并且我们还可以定义当key重复的时候,是使用旧的数据还是使用新的数据呢,除了选择使用新旧数据,当然也可以做一些额外的操作,但该方法还是会有value为null的问题哦
即:

1
2
3
Map<String, String> memberMap = list.stream().collect(Collectors.toMap(Member::getId, Member::getImgPath,
(oldValue,newValue) -> oldValue));
System.out.println(memberMap);

同样,在openJDK的bug列表里自然也少不了这个小bug:JDK-8040892
Incorrect message in Exception thrown by Collectors.toMap(Function,Function)

不过,在JDK9里这个bug应该是被修复了的:JDK-8173464
Wrong exception message when collecting a stream to a map

Pallavi Sonal added a comment - 2017-01-27 00:42
This has been fixed in JDK 9 with JDK-8040892.
In JDK8 versions, it throws the wrong message i.e. instead of Duplicate key , it shows Duplicate key .

  再多说一句,toMap方法还有一个重载方法,是可以指定一个Map的具体实现,该方法或许有时候我们会用到呢。

1
2
3
Map<String, String> memberMap = list.stream().collect(Collectors.toMap(Member::getId, Member::getImgPath,
(oldValue,newValue) -> oldValue, HashMap::new));
System.out.println(memberMap);

hystrix请求命令

Hystrix有两个请求命令 HystrixCommand、HystrixObservableCommand。

  HystrixCommand用在依赖服务返回单个操作结果的时候。又两种执行方式

   -execute():同步执行。从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。

   -queue();异步执行。直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

  HystrixObservableCommand 用在依赖服务返回多个操作结果的时候。它也实现了两种执行方式

   -observe():返回Obervable对象,他代表了操作的多个结果,他是一个HotObservable

   -toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。

HystrixCommand:

使用方式一:继承的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.hope.hystrix.example;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixRequestCache;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.client.RestTemplate;

/**
* Created by lisen on 2017/12/15.
* HystrixCommand用在命令服务返回单个操作结果的时候
*/
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;

public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}

@Override
protected String run() throws Exception {
int i = 1/0;
return "Hello " + name + "!";
}

/**
* 降级。Hystrix会在run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时,
* 执行getFallBack()方法内的逻辑
*/
@Override
protected String getFallback() {
return "faild";
}
}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package org.hope.hystrix.example;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action1;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;

/**
* Created by lisen on 2017/12/15.
*
*/
public class CommandHelloWorldTest {

/**
* 测试同步执行
*/
@Test
public void testSynchronous() {
System.out.println(new CommandHelloWorld("World").execute());
}

/**
* 测试异步执行
*/
@Test
public void testAsynchronous() throws ExecutionException, InterruptedException {
Future<String> fWorld = new CommandHelloWorld("World").queue();
System.out.println(fWorld.get()); //一步执行用get()来获取结果
}

/**
* 虽然HystrixCommand具备了observe()和toObservable()的功能,但是它的实现有一定的局限性,
* 它返回的Observable只能发射一次数据,所以Hystrix还提供了HystrixObservableCommand,
* 通过它实现的命令可以获取能发多次的Observable
*/
@Test
public void testObserve() {
/**
* 返回的是Hot Observable,HotObservable,不论 “事件源” 是否有“订阅者”
* 都会在创建后对事件进行发布。所以对于Hot Observable的每一个“订阅者”都有
* 可能从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程
*/
//blocking
Observable<String> ho = new CommandHelloWorld("World").observe();
// System.out.println(ho.toBlocking().single());

//non-blockking
//- this is a verbose anonymous inner-class approach and doesn't do assertions
ho.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("==============onCompleted");
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(String s) {
System.out.println("=========onNext: " + s);
}
});

ho.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("==================call:" + s);
}
});
}

@Test
public void testToObservable() {
/**
* Cold Observable在没有 “订阅者” 的时候并不会发布时间,
* 而是进行等待,知道有 “订阅者” 之后才发布事件,所以对于
* Cold Observable的订阅者,它可以保证从一开始看到整个操作的全部过程。
*/
Observable<String> co = new CommandHelloWorld("World").toObservable();
System.out.println(co.toBlocking().single());
}

}

使用方式二:注解的方式;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package org.hope.hystrix.example.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import com.netflix.hystrix.contrib.javanica.command.AsyncResult;
import org.springframework.stereotype.Service;
import rx.Observable;
import rx.Subscriber;

import java.util.concurrent.Future;

/**
* 用@HystrixCommand的方式来实现
*/
@Service
public class UserService {
/**
* 同步的方式。
* fallbackMethod定义降级
*/
@HystrixCommand(fallbackMethod = "helloFallback")
public String getUserId(String name) {
int i = 1/0; //此处抛异常,测试服务降级
return "你好:" + name;
}

public String helloFallback(String name) {
return "error";
}

//异步的执行
@HystrixCommand(fallbackMethod = "getUserNameError")
public Future<String> getUserName(final Long id) {
return new AsyncResult<String>() {
@Override
public String invoke() {
int i = 1/0;//此处抛异常,测试服务降级
return "小明:" + id;
}
};
}

public String getUserNameError(Long id) {
return "faile";
}
}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.hope.hystrix.example.service;

import javafx.application.Application;
import org.hope.hystrix.example.HystrixApplication;
import org.hope.hystrix.example.model.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.concurrent.ExecutionException;


@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixApplication.class)
public class UserServiceTest {

@Autowired
private UserService userService;

/**
* 测试同步
*/
@Test
public void testGetUserId() {
System.out.println("=================" + userService.getUserId("lisi"));
}

/**
* 测试异步
*/
@Test
public void testGetUserName() throws ExecutionException, InterruptedException {
System.out.println("=================" + userService.getUserName(30L).get());
}
}

HystrixObservableCommand:

使用方式一:继承的方式

  HystrixObservable通过实现 protected Observable construct() 方法来执行逻辑。通过 重写 resumeWithFallback方法来实现服务降级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package org.hope.hystrix.example;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> {

private final String name;

public ObservableCommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}

@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("Hello");
int i = 1 / 0; //模拟异常
subscriber.onNext(name + "!");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}

/**
* 服务降级
*/
@Override
protected Observable<String> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("失败了!");
subscriber.onNext("找大神来排查一下吧!");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package org.hope.hystrix.example;

import org.junit.Test;
import rx.Observable;

import java.util.Iterator;

public class ObservableCommandHelloWorldTest {

@Test
public void testObservable() {
Observable<String> observable= new ObservableCommandHelloWorld("World").observe();

Iterator<String> iterator = observable.toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}

@Test
public void testToObservable() {
Observable<String> observable= new ObservableCommandHelloWorld("World").observe();
Iterator<String> iterator = observable.toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}

}

使用方式二:注解的方式;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package org.hope.hystrix.example.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import org.springframework.stereotype.Service;
import rx.Observable;
import rx.Subscriber;

@Service
public class ObservableUserService {
/**
* EAGER参数表示使用observe()方式执行
*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER, fallbackMethod = "observFailed") //使用observe()执行方式
public Observable<String> getUserById(final Long id) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("张三的ID:");
int i = 1 / 0; //抛异常,模拟服务降级
subscriber.onNext(String.valueOf(id));
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}

private String observFailed(Long id) {
return "observFailed---->" + id;
}

/**
* LAZY参数表示使用toObservable()方式执行
*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError") //表示使用toObservable()执行方式
public Observable<String> getUserByName(final String name) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("找到");
subscriber.onNext(name);
int i = 1/0; ////抛异常,模拟服务降级
subscriber.onNext("了");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}

private String toObserbableError(String name) {
return "toObserbableError--->" + name;
}

}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.hope.hystrix.example.service;

import org.hope.hystrix.example.HystrixApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Iterator;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixApplication.class)
public class ObservableUserServiceTest {

@Autowired
private ObservableUserService observableUserService;

@Test
public void testObserve() {
Iterator<String> iterator = observableUserService.getUserById(30L).toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println("===============" + iterator.next());
}
}

@Test
public void testToObservable() {
Iterator<String> iterator = observableUserService.getUserByName("王五").toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println("===============" + iterator.next());
}
}
}

总结:
  在实际使用时,我们需要为大多数执行过程中可能会失败的Hystrix命令实现服务降级逻辑,但是也有一些情况可以不去实现降级逻辑,比如:

  执行写操作的命令:

  执行批处理或离线计算的命令:

cas单点登陆

1、结构体系

在上面的文章中,我们介绍过,在CAS的结构中主要分两部分,一部分是CAS Server,另一部分是CAS Client。

CAS Server:CAS Server 负责完成对用户的认证工作 , 需要独立部署 , CAS Server 会处理用户名 / 密码等凭证(Credentials)。

CAS Client:负责处理对客户端受保护资源的访问请求,需要对请求方进行身份认证时,重定向到 CAS Server 进行认证。(原则上,客户端应用不再接受任何的用户名密码等 Credentials )。

1

1
2
3
4
5
6
7
SSO单点登录访问流程主要有以下步骤:
1. 访问服务:SSO客户端发送请求访问应用系统提供的服务资源。
2. 定向认证:SSO客户端会重定向用户请求到SSO服务器。
3. 用户认证:用户身份认证。
4. 发放票据:SSO服务器会产生一个随机的Service Ticket。
5. 验证票据:SSO服务器验证票据Service Ticket的合法性,验证通过后,允许客户端访问服务。
6. 传输用户信息:SSO服务器验证票据通过后,传输用户认证结果信息给客户端。

2、CAS协议

CAS协议是一个简单而强大的基于票据的协议,它涉及一个或多个客户端和一台服务器。即在CAS中,通过TGT(Ticket Granting Ticket)来获取 ST(Service Ticket),通过ST来访问具体服务。

其中主要的关键概念:

TGT(Ticket Granting Ticket)是存储在TGCcookie中的代表用户的SSO会话。
该ST(Service Ticket),作为参数在GET方法的URL中,代表由CAS服务器授予访问CASified应用程序(包含CAS客户端的应用程序)具体用户的权限。

3、CAS术语概念

1、CAS 系统中的票据: TGC、TGT 、 ST 、 PGT 、 PGTIOU 、 PT 。
(1)、TGC(ticket-granting cookie)
授权的票据证明,由 CAS Server 通过 SSL 方式发送给终端用户,存放用户身份认证凭证的Cookie,在浏览器和CAS Server间通讯时使用,并且只能基于安全通道传输(Https),是CAS Server用来明确用户身份的凭证。

(2)、TGT(Ticket Grangting Ticket)
TGT是CAS为用户签发的登录票据,拥有了TGT,用户就可以证明自己在CAS成功登录过。TGT封装了Cookie值以及此Cookie值对应的用户信息。用户在CAS认证成功后,CAS生成Cookie(叫TGC),写入浏览器,同时生成一个TGT对象,放入自己的缓存,TGT对象的ID就是Cookie的值。当HTTP再次请求到来时,如果传过来的有CAS生成的Cookie,则CAS以此Cookie值为key查询缓存中有无TGT ,如果有的话,则说明用户之前登录过,如果没有,则用户需要重新登录。

(3)、ST(Service Ticket)
ST是CAS为用户签发的访问某一service的票据。用户访问service时,service发现用户没有ST,则要求用户去CAS获取ST。用户向CAS发出获取ST的请求,如果用户的请求中包含Cookie,则CAS会以此Cookie值为key查询缓存中有无TGT,如果存在TGT,则用此TGT签发一个ST,返回给用户。用户凭借ST去访问service,service拿ST去CAS验证,验证通过后,允许用户访问资源。

(4)、PGT(Proxy Granting Ticket)
Proxy Service的代理凭据。用户通过CAS成功登录某一Proxy Service后,CAS生成一个PGT对象,缓存在CAS本地,同时将PGT的值(一个UUID字符串)回传给Proxy Service,并保存在Proxy Service里。Proxy Service拿到PGT后,就可以为Target Service(back-end service)做代理,为其申请PT。

(5)、PGTIOU(Proxy Granting Ticket I Owe You)
PGTIOU是CAS协议中定义的一种附加票据,它增强了传输、获取PGT的安全性。
PGT的传输与获取的过程:Proxy Service调用CAS的serviceValidate接口验证ST成功后,CAS首先会访问pgtUrl指向的Https URL,将生成的 PGT及PGTIOU传输给proxy service,proxy service会以PGTIOU为key,PGT为value,将其存储在Map中;然后CAS会生成验证ST成功的XML消息,返回给Proxy Service,XML消息中含有PGTIOU,proxy service收到XML消息后,会从中解析出PGTIOU的值,然后以其为key,在Map中找出PGT的值,赋值给代表用户信息的Assertion对象的pgtId,同时在Map中将其删除。

(6)、PT(Proxy Ticket)
PT是用户访问Target Service(back-end service)的票据。如果用户访问的是一个Web应用,则Web应用会要求浏览器提供ST,浏览器就会用Cookie去CAS获取一个ST,然后就可以访问这个Web应用了。如果用户访问的不是一个Web应用,而是一个C/S结构的应用,因为C/S结构的应用得不到Cookie,所以用户不能自己去CAS获取ST,而是通过访问proxy service的接口,凭借proxy service的PGT去获取一个PT,然后才能访问到此应用。

2、TGT、ST、PGT、PT之间关系
ST是TGT签发的。用户在CAS上认证成功后,CAS生成TGT,用TGT签发一个ST,ST的ticketGrantingTicket属性值是TGT对象,然后把ST的值redirect到客户应用。
PGT是ST签发的。用户凭借ST去访问Proxy service,Proxy service去CAS验证ST(同时传递PgtUrl参数给CAS),如果ST验证成功,则CAS用ST签发一个PGT,PGT对象里的ticketGrantingTicket是签发ST的TGT对象。
PT是PGT签发的。Proxy service代理back-end service去CAS获取PT的时候,CAS根据传来的pgt参数,获取到PGT对象,然后调用其grantServiceTicket方法,生成一个PT对象。

其他单点登陆:
https://blog.csdn.net/u012394095/article/details/79732224

paxos算法

算法内容

Paxos在原作者的《Paxos Made Simple》中内容是比较精简的:

1
2
3
4
5
6
7
8
9
10
11
> Phase 1
>
> (a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.
>
> (b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered pro-posal (if any) that it has accepted.
>
> Phase 2
>
> (a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v , where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.
>
> (b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

借用[paxos图解]文中的流程图可概括为:
1

实例及详解

Paxos中有三类角色ProposerAcceptorLearner,主要交互过程在ProposerAcceptor之间。

ProposerAcceptor之间的交互主要有4类消息通信,如下图:
2

这4类消息对应于paxos算法的两个阶段4个过程:

  • phase 1
    • a) proposer向网络内超过半数的acceptor发送prepare消息
    • b) acceptor正常情况下回复promise消息
  • phase 2
    • a) 在有足够多acceptor回复promise消息时,proposer发送accept消息
    • b) 正常情况下acceptor回复accepted消息

因为在整个过程中可能有其他proposer针对同一件事情发出以上请求,所以在每个过程中都会有些特殊情况处理,这也是为了达成一致性所做的事情。如果在整个过程中没有其他proposer来竞争,那么这个操作的结果就是确定无异议的。但是如果有其他proposer的话,情况就不一样了。

paxos中文wiki上的例子为例。简单来说该例子以若干个议员提议税收,确定最终通过的法案税收比例。

以下图中基本只画出proposer与一个acceptor的交互。时间标志T2总是在T1后面。propose number简称N。

情况之一如下图:
3

A3在T1发出accepted给A1,然后在T2收到A5的prepare,在T3的时候A1才通知A5最终结果(税率10%)。这里会有两种情况:

  • A5发来的N5小于A1发出去的N1,那么A3直接拒绝(reject)A5
  • A5发来的N5大于A1发出去的N1,那么A3回复promise,但带上A1的(N1, 10%)

这里可以与paxos流程图对应起来,更好理解。acceptor会记录(MaxN, AcceptN, AcceptV)

A5在收到promise后,后续的流程可以顺利进行。但是发出accept时,因为收到了(AcceptN, AcceptV),所以会取最大的AcceptN对应的AcceptV,例子中也就是A1的10%作为AcceptV。如果在收到promise时没有发现有其他已记录的AcceptV,则其值可以由自己决定。

针对以上A1和A5冲突的情况,最终A1和A5都会广播接受的值为10%。

其实4个过程中对于acceptor而言,在回复promise和accepted时由于都可能因为其他proposer的介入而导致特殊处理。所以基本上看在这两个时间点收到其他proposer的请求时就可以了解整个算法了。例如在回复promise时则可能因为proposer发来的N不够大而reject:
4

如果在发accepted消息时,对其他更大N的proposer发出过promise,那么也会reject该proposer发出的accept,如图:
5

这个对应于Phase 2 b):

1
> it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

总结

Leslie Lamport没有用数学描述Paxos,但是他用英文阐述得很清晰。将Paxos的两个Phase的内容理解清楚,整个算法过程还是不复杂的。

至于Paxos中一直提到的一个全局唯一且递增的proposer number,其如何实现,引用如下:

1
> 如何产生唯一的编号呢?在《Paxos made simple》中提到的是让所有的Proposer都从不相交的数据集合中进行选择,例如系统有5个Proposer,则可为每一个Proposer分配一个标识j(0~4),则每一个proposer每次提出决议的编号可以为5*i + j(i可以用来表示提出议案的次数)

原文:https://www.cnblogs.com/hugb/p/8955505.html

分布式事务

数据库事务

在说分布式事务之前,我们先从数据库事务说起。 数据库事务可能大家都很熟悉,在开发过程中也会经常使用到。但是即使如此,可能对于一些细节问题,很多人仍然不清楚。比如很多人都知道数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation)和持久性(Durabilily),简称就是ACID。但是再往下比如问到隔离性指的是什么的时候可能就不知道了,或者是知道隔离性是什么但是再问到数据库实现隔离的都有哪些级别,或者是每个级别他们有什么区别的时候可能就不知道了。

本文并不打算介绍这些数据库事务的这些东西,有兴趣可以搜索一下相关资料。不过有一个知识点我们需要了解,就是假如数据库在提交事务的时候突然断电,那么它是怎么样恢复的呢? 为什么要提到这个知识点呢? 因为分布式系统的核心就是处理各种异常情况,这也是分布式系统复杂的地方,因为分布式的网络环境很复杂,这种“断电”故障要比单机多很多,所以我们在做分布式系统的时候,最先考虑的就是这种情况。这些异常可能有 机器宕机、网络异常、消息丢失、消息乱序、数据错误、不可靠的TCP、存储数据丢失、其他异常等等…

我们接着说本地事务数据库断电的这种情况,它是怎么保证数据一致性的呢?我们使用SQL Server来举例,我们知道我们在使用 SQL Server 数据库是由两个文件组成的,一个数据库文件和一个日志文件,通常情况下,日志文件都要比数据库文件大很多。数据库进行任何写入操作的时候都是要先写日志的,同样的道理,我们在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,那么当突然断电的时候,即使操作没有完成,在重新启动数据库时候,数据库会根据当前数据的情况进行undo回滚或者是redo前滚,这样就保证了数据的强一致性。

接着,我们就说一下分布式事务。

分布式理论

当我们的单个数据库的性能产生瓶颈的时候,我们可能会对数据库进行分区,这里所说的分区指的是物理分区,分区之后可能不同的库就处于不同的服务器上了,这个时候单个数据库的ACID已经不能适应这种情况了,而在这种ACID的集群环境下,再想保证集群的ACID几乎是很难达到,或者即使能达到那么效率和性能会大幅下降,最为关键的是再很难扩展新的分区了,这个时候如果再追求集群的ACID会导致我们的系统变得很差,这时我们就需要引入一个新的理论原则来适应这种集群的情况,就是 CAP 原则或者叫CAP定理,那么CAP定理指的是什么呢?

CAP定理

CAP定理是由加州大学伯克利分校Eric Brewer教授提出来的,他指出WEB服务无法同时满足一下3个属性:

  • 一致性(Consistency) : 客户端知道一系列的操作都会同时发生(生效)
  • 可用性(Availability) : 每个操作都必须以可预期的响应结束
  • 分区容错性(Partition tolerance) : 即使出现单个组件无法可用,操作依然可以完成

具体地讲在分布式系统中,在任何数据库设计中,一个Web应用至多只能同时支持上面的两个属性。显然,任何横向扩展策略都要依赖于数据分区。因此,设计人员必须在一致性与可用性之间做出选择。

这个定理在迄今为止的分布式系统中都是适用的! 为什么这么说呢?

这个时候有同学可能会把数据库的2PC(两阶段提交)搬出来说话了。OK,我们就来看一下数据库的两阶段提交。

对数据库分布式事务有了解的同学一定知道数据库支持的2PC,又叫做 XA Transactions。

MySQL从5.5版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。

其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:

  • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
  • 第二阶段:事务协调器要求每个数据库提交数据。

其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。这样做的缺陷是什么呢? 咋看之下我们可以在数据库分区之间获得一致性。

如果CAP 定理是对的,那么它一定会影响到可用性。

如果说系统的可用性代表的是执行某项操作相关所有组件的可用性的和。那么在两阶段提交的过程中,可用性就代表了涉及到的每一个数据库中可用性的和。我们假设两阶段提交的过程中每一个数据库都具有99.9%的可用性,那么如果两阶段提交涉及到两个数据库,这个结果就是99.8%。根据系统可用性计算公式,假设每个月43200分钟,99.9%的可用性就是43157分钟, 99.8%的可用性就是43114分钟,相当于每个月的宕机时间增加了43分钟。

以上,可以验证出来,CAP定理从理论上来讲是正确的,CAP我们先看到这里,等会再接着说。

BASE理论

在分布式系统中,我们往往追求的是可用性,它的重要程序比一致性要高,那么如何实现高可用性呢? 前人已经给我们提出来了另外一个理论,就是BASE理论,它是用来对CAP定理进行进一步扩充的。BASE理论指的是:

  • Basically Available(基本可用)
  • Soft state(软状态)
  • Eventually consistent(最终一致性)

BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,理论的核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

有了以上理论之后,我们来看一下分布式事务的问题。

分布式事务

在分布式系统中,要实现分布式事务,无外乎那几种解决方案。

强一致性

一、两阶段提交(2PC)

和上一节中提到的数据库XA事务一样,两阶段提交就是使用XA协议的原理,我们可以从下面这个图的流程来很容易的看出中间的一些比如commit和abort的细节。
1

两阶段提交这种解决方案属于牺牲了一部分可用性来换取的一致性。在实现方面,在 .NET 中,可以借助 TransactionScop 提供的 API 来编程实现分布式系统中的两阶段提交,比如WCF中就有实现这部分功能。不过在多服务器之间,需要依赖于DTC来完成事务一致性,Windows下微软搞的有MSDTC服务,Linux下就比较悲剧了。

另外说一句,TransactionScop 默认不能用于异步方法之间事务一致,因为事务上下文是存储于当前线程中的,所以如果是在异步方法,需要显式的传递事务上下文。

优点: 尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域。(其实也不能100%保证强一致)

缺点: 实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景,如果分布式系统跨接口调用,目前 .NET 界还没有实现方案。

二、补偿事务(TCC)

TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  • Try 阶段主要是对业务系统做检测及资源预留

  • Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。

  • Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入 Bob 要向 Smith 转账,思路大概是:
我们有一个本地方法,里面依次调用
1、首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
2、在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
3、如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。

优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些

缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

三、Sagas 事务模型

Saga事务模型又叫做长时间运行的事务(Long-running-transaction), 它是由普林斯顿大学的H.Garcia-Molina等人提出,它描述的是另外一种在没有两阶段提交的的情况下解决分布式系统中复杂的业务事务问题。你可以在这里看到 Sagas 相关论文。

我们这里说的是一种基于 Sagas 机制的工作流事务模型,这个模型的相关理论目前来说还是比较新的,以至于百度上几乎没有什么相关资料。

该模型其核心思想就是拆分分布式系统中的长事务为多个短事务,或者叫多个本地事务,然后由 Sagas 工作流引擎负责协调,如果整个流程正常结束,那么就算是业务成功完成,如果在这过程中实现失败,那么Sagas工作流引擎就会以相反的顺序调用补偿操作,重新进行业务回滚。

比如我们一次关于购买旅游套餐业务操作涉及到三个操作,他们分别是预定车辆,预定宾馆,预定机票,他们分别属于三个不同的远程接口。可能从我们程序的角度来说他们不属于一个事务,但是从业务角度来说是属于同一个事务的。
2

他们的执行顺序如上图所示,所以当发生失败时,会依次进行取消的补偿操作。

因为长事务被拆分了很多个业务流,所以 Sagas 事务模型最重要的一个部件就是工作流或者你也可以叫流程管理器(Process Manager),工作流引擎和Process Manager虽然不是同一个东西,但是在这里,他们的职责是相同的。在选择工作流引擎之后,最终的代码也许看起来是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
SagaBuilder saga = SagaBuilder.newSaga("trip")
.activity("Reserve car", ReserveCarAdapter.class)
.compensationActivity("Cancel car", CancelCarAdapter.class)
.activity("Book hotel", BookHotelAdapter.class)
.compensationActivity("Cancel hotel", CancelHotelAdapter.class)
.activity("Book flight", BookFlightAdapter.class)
.compensationActivity("Cancel flight", CancelFlightAdapter.class)
.end()
.triggerCompensationOnAnyError();

camunda.getRepositoryService().createDeployment()
.addModelInstance(saga.getModel())
.deploy();

这里有一个 C# 相关示例,有兴趣的同学可以看一下。

优缺点这里我们就不说了,因为这个理论比较新,目前市面上还没有什么解决方案,即使是 Java 领域,我也没有搜索的太多有用的信息。

最终一致性(BASE理论)

一、本地消息表(异步确保)

本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。我们可以从下面的流程图中看出其中的一些细节:
3

基本思路就是:

消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

这种方案遵循BASE理论,采用的是最终一致性,笔者认为是这几种方案里面比较适合实际业务场景的,即不会出现像2PC那样复杂的实现(当调用链很长的时候,2PC的可用性是非常低的),也不会像TCC那样可能出现确认或者回滚不了的情况。

优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。

缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

二、MQ 事务消息

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。

以阿里的 RocketMQ 中间件为例,其思路大致为:

第一阶段Prepared消息,会拿到消息的地址。
第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
4

遗憾的是,RocketMQ并没有 .NET 客户端。有关 RocketMQ的更多消息,大家可以查看这篇博客

优点: 实现了最终一致性,不需要依赖本地数据库事务。

缺点: 实现难度大,主流MQ不支持,没有.NET客户端,RocketMQ事务消息部分代码也未开源。

分布式限流方案

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹;而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流。

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。

一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

先有缓存这个银弹,后有限流来应对618、双十一高并发流量,在处理高并发问题上可以说是如虎添翼,不用担心瞬间流量导致系统挂掉或雪崩,最终做到有损服务而不是不服务;限流需要评估好,不可乱用,否则会正常流量出现一些奇怪的问题而导致用户抱怨。

在实际应用时也不要太纠结算法问题,因为一些限流算法实现是一样的只是描述不一样;具体使用哪种限流技术还是要根据实际场景来选择,不要一味去找最佳模式,白猫黑猫能解决问题的就是好猫。

因在实际工作中遇到过许多人来问如何进行限流,因此本文会详细介绍各种限流手段。那么接下来我们从限流算法、应用级限流、分布式限流、接入层限流来详细学习下限流技术手段。

限流算法

计数器法

计 数器法是限流算法里最简单也是最容易实现的一种算法。比如我们规定,对于A接口来说,我们1分钟的访问次数不能超过100个。那么我们可以这么做:在一开 始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个 请求的间隔时间还在1分钟之内,那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter。
这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题。
假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。

聪明的朋友可能已经看出来了,刚才的问题其实是因为我们统计的精度太低。那么如何很好地处理这个问题呢?或者说,如何将临界问题的影响降低呢?我们可以看下面的滑动窗口算法。

滑动窗口
滑动窗口,又称rolling window。为了解决这个问题,我们引入了滑动窗口算法。如果学过TCP网络协议的话,那么一定对滑动窗口这个名词不会陌生。
在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗口怎么解决刚才的临界问题的呢?我们可以看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格 子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触 发了限流。

我再来回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

漏桶算法
漏桶算法,又称leaky bucket。
1

从图中我们可以看到,整个算法其实十分简单。首先,我们有一个固定容量的桶,有水流进来,也有水流出去。对于流进来的水来说,我们无法预计一共有多 少水会流进来,也无法预计水流的速度。但是对于流出去的水来说,这个桶可以固定水流出的速率。而且,当桶满了之后,多余的水将会溢出。

我们将算法中的水换成实际应用中的请求,我们可以看到漏桶算法天生就限制了请求的速度。当使用了漏桶算法,我们可以保证接口会以一个常速速率来处理请求。所以漏桶算法天生不会出现临界问题。具体的伪代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LeakyDemo {
public long timeStamp = getNowTime();

public int capacity; // 桶的容量

public int rate; // 水漏出的速度

public int water; // 当前水量(当前累积请求数)

public boolean grant() {

long now = getNowTime();

water = max(0, water - (now - timeStamp) * rate); // 先执行漏水,计算剩余水量

timeStamp = now;

if ((water + 1) < capacity) {

// 尝试加水,并且水还未满

water += 1;

return true;
}
else {
// 水满,拒绝加水
return false;
}
}
}

令牌桶算法

令牌桶算法,又称token bucket。
2
从图中我们可以看到,令牌桶算法比漏桶算法稍显复杂。首先,我们有一个固定容量的桶,桶里存放着令牌(token)。桶一开始是空的,token以 一个固定的速率r往桶里填充,直到达到桶的容量,多余的令牌将会被丢弃。每当一个请求过来时,就会尝试从桶里移除一个令牌,如果没有令牌的话,请求无法通 过。

具体的伪代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TokenBucketDemo {

public long timeStamp = getNowTime();

public int capacity; // 桶的容量

public int rate; // 令牌放入速度

public int tokens; // 当前令牌数量

public boolean grant() {

long now = getNowTime();

// 先添加令牌

tokens = min(capacity, tokens + (now - timeStamp) * rate);

timeStamp = now;

if (tokens < 1) {

// 若不到1个令牌,则拒绝

return false;
}
else {
// 还有令牌,领取令牌

tokens -= 1;
return true;
}
}
}

临界问题

我 们再来考虑一下临界问题的场景。在0:59秒的时候,由于桶内积满了100个token,所以这100个请求可以瞬间通过。但是由于token是以较低的 速率填充的,所以在1:00的时候,桶内的token数量不可能达到100个,那么此时不可能再有100个请求通过。所以令牌桶算法可以很好地解决临界问 题。
总结

计数器 VS 滑动窗口

计数器算法是最简单的算法,可以看成是滑动窗口的低精度实现。滑动窗口由于需要存储多份的计数器(每一个格子存一份),所以滑动窗口在实现上需要更多的存储空间。也就是说,如果滑动窗口的精度越高,需要的存储空间就越大。

漏桶算法 VS 令牌桶算法

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;

  • 漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;

  • 令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;

  • 漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;

  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;

  • 两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的。

另外有时候我们还使用计数器来进行限流,主要用来限制总并发数,比如数据库连接池、线程池、秒杀的并发数;只要全局总请求数或者一定时间段的总请求数设定的阀值则进行限流,是简单粗暴的总数量限流,而不是平均速率限流。

到此基本的算法就介绍完了,接下来我们首先看看应用级限流。

应用级限流

限流总并发/连接/请求数

对于一个应用系统来说一定会有极限并发/请求数,即总有一个TPS/QPS阀值,如果超了阀值则系统就会不响应用户请求或响应的非常慢,因此我们最好进行过载保护,防止大量请求涌入击垮系统。

如果你使用过Tomcat,其Connector 其中一种配置有如下几个参数:

acceptCount:如果Tomcat的线程都忙于响应,新来的连接会进入队列排队,如果超出排队大小,则拒绝连接;

maxConnections: 瞬时最大连接数,超出的会排队等待;

maxThreads:Tomcat能启动用来处理请求的最大线程数,如果请求处理量一直远远大于最大线程数则可能会僵死。

详细的配置请参考官方文档。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都会有类似的限制连接数的配置。

限流总资源数

如果有的资源是稀缺资源(如数据库连接、线程),而且可能有多个系统都会去使用它,那么需要限制应用;可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是100,那么本应用最多可以使用100个资源,超出了可以等待或者抛异常。

限流某个接口的总并发/**请求数**

如果接口可能会有突发访问情况,但又担心访问量太大造成崩溃,如抢购业务;这个时候就需要限制这个接口的总并发/请求数总请求数了;因为粒度比较细,可以为每个接口都设置相应的阀值。可以使用Java中的AtomicLong进行限流:

try {
if(atomic.incrementAndGet() > 限流数) {
//拒绝请求
}
//处理请求
} finally {
atomic.decrementAndGet();
}

适合对业务无损的服务或者需要过载保护的服务进行限流,如抢购业务,超出了大小要么让用户排队,要么告诉用户没货了,对用户来说是可以接受的。而一些开放平台也会限制用户调用某个接口的试用请求量,也可以用这种计数器方式实现。这种方式也是简单粗暴的限流,没有平滑处理,需要根据实际情况选择使用;

限流某个接口的时间窗请求数

即一个时间窗口内的请求数,如想限制某个接口/服务每秒/每分钟/每天的请求数/调用量。如一些基础服务会被很多其他系统调用,比如商品详情页服务会调用基础商品服务调用,但是怕因为更新量比较大将基础服务打挂,这时我们要对每秒/每分钟的调用量进行限速;一种实现方式如下所示:

LoadingCache<Long, AtomicLong> counter =
CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(new CacheLoader<Long, AtomicLong>() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return new AtomicLong(0);
}
});
long limit = 1000;
while(true) {
//__得到当前秒 long currentSeconds = System.currentTimeMillis() / 1000;
if(counter.get(currentSeconds).incrementAndGet() > limit) {
System.out.println(“**限流了:” + currentSeconds); continue**;
}
//__业务处理
}

我们使用Guava的Cache来存储计数器,过期时间设置为2秒(保证1秒内的计数器是有的),然后我们获取当前时间戳然后取秒数来作为KEY进行计数统计和限流,这种方式也是简单粗暴,刚才说的场景够用了。

平滑限流某个接口的请求数

之前的限流方式都不能很好地应对突发请求,即瞬间请求可能都被允许从而导致一些问题;因此在一些场景中需要对突发请求进行整形,整形为平均速率请求处理(比如5r/s,则每隔200毫秒处理一个请求,平滑了速率)。这个时候有两种算法满足我们的场景:令牌桶和漏桶算法。Guava框架提供了令牌桶算法实现,可直接拿来使用。

Guava RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。

到此应用级限流的一些方法就介绍完了。假设将应用部署到多台机器,应用级限流方式只是单应用内的请求限流,不能进行全局限流。因此我们需要分布式限流和接入层限流来解决这个问题。

分布式限流

分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使使用redis+lua或者nginx+lua技术进行实现,通过这两种技术可以实现的高并发和高性能。

首先我们来使用redis+lua实现时间窗内某个接口的请求数限流,实现了该功能后可以改造为限流总并发/请求数和限制总资源数。Lua本身就是一种编程语言,也可以使用它实现复杂的令牌桶或漏桶算法。

redis+lua实现中的lua脚本:

1
2
3
4
5
6
7
8
9
local key = KEYS[1] --限流KEY(一秒一个)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call("INCRBY", key, "1")) --请求数+1
if current > limit then --如果超出限流大小
return 0
elseif current == 1 then --只有第一次访问需要设置2秒的过期时间
redis.call("expire", key,"2")
end
return 1

如上操作因是在一个lua脚本中,又因Redis是单线程模型,因此是线程安全的。如上方式有一个缺点就是当达到限流大小后还是会递增的,可以改造成如下方式实现:

1
2
3
4
5
6
7
8
9
10
local key = KEYS[1] --限流KEY(一秒一个)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --如果超出限流大小
return 0
else --请求数+1,并设置2秒过期
redis.call("INCRBY", key,"1")
redis.call("expire", key,"2")
return 1
end

如下是Java中判断是否需要限流的代码:

1
2
3
4
5
6
7
public static boolean acquire() throws Exception {
String luaScript = Files.toString(new File("limit.lua"), Charset.defaultCharset());
Jedis jedis = new Jedis("192.168.147.52", 6379);
String key = "ip:" + System.currentTimeMillis()/ 1000; //此处将当前时间戳取秒数
Stringlimit = "3"; //限流大小
return (Long)jedis.eval(luaScript,Lists.newArrayList(key), Lists.newArrayList(limit)) == 1;
}

因为Redis的限制(Lua中有写操作不能使用带随机性质的读操作,如TIME)不能在Redis Lua中使用TIME获取时间戳,因此只好从应用获取然后传入,在某些极端情况下(机器时钟不准的情况下),限流会存在一些小问题。

使用Nginx+Lua实现的Lua脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local locks = require "resty.lock"
local function acquire()
local lock =locks:new("locks")
local elapsed, err =lock:lock("limit_key") --互斥锁
local limit_counter =ngx.shared.limit_counter --计数器
local key = "ip:" ..os.time()
local limit = 5 --限流大小
local current =limit_counter:get(key)

if current ~= nil and current + 1> limit then --如果超出限流大小
lock:unlock()
return 0
end
if current == nil then
limit_counter:set(key, 1, 1) --第一次需要设置过期时间,设置key的值为1,过期时间为1秒
else
limit_counter:incr(key, 1) --第二次开始加1即可
end
lock:unlock()
return 1
end
ngx.print(acquire())

实现中我们需要使用lua-resty-lock互斥锁模块来解决原子性问题(在实际工程中使用时请考虑获取锁的超时问题),并使用ngx.shared.DICT共享字典来实现计数器。如果需要限流则返回0,否则返回1。使用时需要先定义两个共享字典(分别用来存放锁和计数器数据):

1
2
3
4
5
http {  
……
lua_shared_dict locks 10m;
lua_shared_dict limit_counter 10m;
}

有人会纠结如果应用并发量非常大那么redis或者nginx是不是能抗得住;不过这个问题要从多方面考虑:你的流量是不是真的有这么大,是不是可以通过一致性哈希将分布式限流进行分片,是不是可以当并发量太大降级为应用级限流;对策非常多,可以根据实际情况调节;像在京东使用Redis+Lua来限流抢购流量,一般流量是没有问题的。

对于分布式限流目前遇到的场景是业务上的限流,而不是流量入口的限流;流量入口限流应该在接入层完成,而接入层笔者一般使用Nginx。

接入层限流

接入层通常指请求流量的入口,该层的主要目的有:负载均衡、非法请求过滤、请求聚合、缓存、降级、限流、A/B测试、服务质量监控等等,可以参考笔者写的《使用Nginx+Lua(OpenResty)开发高性能Web应用》。

对于Nginx接入层限流可以使用Nginx自带了两个模块:连接数限流模块ngx_http_limit_conn_module和漏桶算法实现的请求限流模块ngx_http_limit_req_module。还可以使用OpenResty提供的Lua限流模块lua-resty-limit-traffic进行更复杂的限流场景。

limit_conn用来对某个KEY对应的总的网络连接数进行限流,可以按照如IP、域名维度进行限流。limit_req用来对某个KEY对应的请求的平均速率进行限流,并有两种用法:平滑模式(delay)和允许突发模式(nodelay)。

ngx_http_limit_conn_module

limit_conn是对某个KEY对应的总的网络连接数进行限流。可以按照IP来限制IP维度的总连接数,或者按照服务域名来限制某个域名的总连接数。但是记住不是每一个请求连接都会被计数器统计,只有那些被Nginx处理的且已经读取了整个请求头的请求连接才会被计数器统计。

配置示例:

1
2
3
4
5
6
7
8
9
10
http {
limit_conn_zone$binary_remote_addr zone=addr:10m;
limit_conn_log_level error;
limit_conn_status 503;
...
server {
...
location /limit {
limit_conn addr 1;
}

limit_conn:要配置存放KEY和计数器的共享内存区域和指定KEY的最大连接数;此处指定的最大连接数是1,表示Nginx最多同时并发处理1个连接;

limit_conn_zone:用来配置限流KEY、及存放KEY对应信息的共享内存区域大小;此处的KEY是“$binary_remote_addr”其表示IP地址,也可以使用如$server_name作为KEY来限制域名级别的最大连接数;

limit_conn_status:配置被限流后返回的状态码,默认返回503;

limit_conn_log_level:配置记录被限流后的日志级别,默认error级别。

limit_conn的主要执行过程如下所示:

1、请求进入后首先判断当前limit_conn_zone中相应KEY的连接数是否超出了配置的最大连接数;

2.1、如果超过了配置的最大大小,则被限流,返回limit_conn_status定义的错误状态码;

2.2、否则相应KEY的连接数加1,并注册请求处理完成的回调函数;

3、进行请求处理;

4、在结束请求阶段会调用注册的回调函数对相应KEY的连接数减1。

limt_conn可以限流某个KEY的总并发/请求数,KEY可以根据需要变化。

按照IP**限制并发连接数配置示例:**

首先定义IP维度的限流区域:

limit_conn_zone $binary_remote_addrzone=perip:10m;

接着在要限流的location中添加限流逻辑:

location /limit {
limit_conn perip 2;
echo “123”;
}

即允许每个IP最大并发连接数为2。

使用AB测试工具进行测试,并发数为5个,总的请求数为5个:

ab -n 5 -c 5 http://localhost/limit

将得到如下access.log输出:

1
2
3
4
5
6
7
8
9
[08/Jun/2016:20:10:51+0800] [1465373451.802] 200

[08/Jun/2016:20:10:51+0800] [1465373451.803] 200

[08/Jun/2016:20:10:51 +0800][1465373451.803] 503

[08/Jun/2016:20:10:51 +0800][1465373451.803] 503

[08/Jun/2016:20:10:51 +0800][1465373451.803] 503

此处我们把access log格式设置为log_format main ‘[$time_local] [$msec] $status’;分别是“日期 日期秒/毫秒值 响应状态码”。

如果被限流了,则在error.log中会看到类似如下的内容:

2016/06/08 20:10:51 [error] 5662#0: *5limiting connections by zone “perip”, client: 127.0.0.1, server: _,request: “GET /limit HTTP/1.0”, host: “localhost”

按照域名限制并发连接数配置示例:

首先定义域名维度的限流区域:

limit_conn_zone $ server_name zone=perserver:10m;

接着在要限流的location中添加限流逻辑:

1
2
3
4
location /limit {
limit_conn perserver 2;
echo "123";
}

即允许每个域名最大并发请求连接数为2;这样配置可以实现服务器最大连接数限制。

ngx_http_limit_req_module

limit_req是漏桶算法实现,用于对指定KEY对应的请求进行限流,比如按照IP维度限制请求速率。

配置示例:

1
2
3
4
5
6
7
8
9
10
http {
limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
limit_conn_log_level error;
limit_conn_status 503;
...
server {
...
location /limit {
limit_req zone=one burst=5 nodelay;
}

limit_req:配置限流区域、桶容量(突发容量,默认0)、是否延迟模式(默认延迟);

limit_req_zone:配置限流KEY、及存放KEY对应信息的共享内存区域大小、固定请求速率;此处指定的KEY是“$binary_remote_addr”表示IP地址;固定请求速率使用rate参数配置,支持10r/s和60r/m,即每秒10个请求和每分钟60个请求,不过最终都会转换为每秒的固定请求速率(10r/s为每100毫秒处理一个请求;60r/m,即每1000毫秒处理一个请求)。

limit_conn_status:配置被限流后返回的状态码,默认返回503;

limit_conn_log_level:配置记录被限流后的日志级别,默认error级别。

limit_req的主要执行过程如下所示:

1、请求进入后首先判断最后一次请求时间相对于当前时间(第一次是0)是否需要限流,如果需要限流则执行步骤2,否则执行步骤3;

2.1、如果没有配置桶容量(burst),则桶容量为0;按照固定速率处理请求;如果请求被限流,则直接返回相应的错误码(默认503);

2.2、如果配置了桶容量(burst>0)且延迟模式(没有配置nodelay);如果桶满了,则新进入的请求被限流;如果没有满则请求会以固定平均速率被处理(按照固定速率并根据需要延迟处理请求,延迟使用休眠实现);

2.3、如果配置了桶容量(burst>0)且非延迟模式(配置了nodelay);不会按照固定速率处理请求,而是允许突发处理请求;如果桶满了,则请求被限流,直接返回相应的错误码;

3、如果没有被限流,则正常处理请求;

4、Nginx会在相应时机进行选择一些(3个节点)限流KEY进行过期处理,进行内存回收。

这些算法中有些允许突发,有些会整形为平滑,有些计算算法简单粗暴;其中令牌桶算法和漏桶算法实现上是类似的,只是表述的方向不太一样,对于业务来说不必刻意去区分它们;因此需要根据实际场景来决定如何限流,最好的算法不一定是最适用的。

springboot设置@configuration的加载顺序

1、简介

Spring Boot会检查你发布的jar中是否存在META-INF/spring.factories文件,该文件中以EnableAutoConfiguration为key的属性应该列出你的配置类:

1
2
3
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.mycorp.libx.autoconfigure.LibXAutoConfiguration,\
com.mycorp.libx.autoconfigure.LibXWebAutoConfiguration

你可以使用[@AutoConfigureAfter]或[@AutoConfigureBefore]注解为配置类指定特定的顺序。例如,如果你提供web-specific配置,你的类就需要应用在WebMvcAutoConfiguration后面。

你也可以使用@AutoconfigureOrder注解为那些相互不知道存在的自动配置类提供排序,该注解语义跟常规的@Order注解相同,但专为自动配置类提供顺序。

注:自动配置类只能通过这种方式加载,确保它们定义在一个特殊的package中,特别是不能成为组件扫描的目标。

2、例子

例如下面的项目,想要保证加载BananaConf.java后再加载AppleConf.java
微信截图_20190616124419

1)添加文件spring.factories(若没有META-INF文件夹,则需要先添加)
2)在spring.factories中加入

1
2
3
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.demo.conf.AppleConf,\
com.example.demo.conf.BananaConf

3)AppleConf中添加 @AutoConfigureAfter(BananaConf.class)

1
2
3
4
5
6
@Configuration
@AutoConfigureAfter(BananaConf.class)
public class AppleConf
{
xxx
}

BananaConf正常配置即可

1
2
3
4
5
@Configuration
public class BananaConf
{
xxx
}

注意:在spring.factories注册了的配置类,可以不用再该配置类写@Configuration

spring boot factories扩展机制

写在前面:Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的。

什么是 SPI机制

SPI的全名为Service Provider Interface.大多数开发人员可能不熟悉,因为这个是针对厂商或者插件的。在java.util.ServiceLoader的文档里有比较详细的介绍。
简单的总结下java SPI机制的思想。我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。
java SPI就是提供这样的一个机制:为某个接口寻找服务实现的机制。有点类似IOC的思想,就是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要。

Spring Boot中的SPI机制

在Spring中也有一种类似与Java SPI的加载机制。它在META-INF/spring.factories文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。
这种自定义的SPI机制是Spring Boot Starter实现的基础。
1

Spring Factories实现原理是什么

spring-core包里定义了SpringFactoriesLoader类,这个类实现了检索META-INF/spring.factories文件,并获取指定接口的配置的功能。在这个类中定义了两个对外的方法:

loadFactories 根据接口类获取其实现类的实例,这个方法返回的是对象列表。
loadFactoryNames 根据接口获取其接口类的名称,这个方法返回的是类名的列表。

上面的两个方法的关键都是从指定的ClassLoader中获取spring.factories文件,并解析得到类名列表,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static List<String> loadFactoryNames(Class<?> factoryClass, ClassLoader classLoader) {
String factoryClassName = factoryClass.getName();
try {
Enumeration<URL> urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
List<String> result = new ArrayList<String>();
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url));
String factoryClassNames = properties.getProperty(factoryClassName);
result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames)));
}
return result;
}
catch (IOException ex) {
throw new IllegalArgumentException("Unable to load [" + factoryClass.getName() +
"] factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex);
}
}

从代码中我们可以知道,在这个方法中会遍历整个ClassLoader中所有jar包下的spring.factories文件。也就是说我们可以在自己的jar中配置spring.factories文件,不会影响到其它地方的配置,也不会被别人的配置覆盖。

spring.factories的是通过Properties解析得到的,所以我们在写文件中的内容都是安装下面这种方式配置的:

com.xxx.interface=com.xxx.classname

如果一个接口希望配置多个实现类,可以使用’,’进行分割。

Spring Factories在Spring Boot中的应用
在Spring Boot的很多包中都能够找到spring.factories文件,接下来我们以spring-boot包为例进行介绍
2

在日常工作中,我们可能需要实现一些SDK或者Spring Boot Starter给被人使用时,
我们就可以使用Factories机制。Factories机制可以让SDK或者Starter的使用只需要很少或者不需要进行配置,只需要在服务中引入我们的jar包即可。

springboot配置加载顺序

使用 Spring Boot 会涉及到各种各样的配置,如开发、测试、线上就至少 3 套配置信息了。Spring Boot 可以轻松的帮助我们使用相同的代码就能使开发、测试、线上环境使用不同的配置。

在 Spring Boot 里面,可以使用以下几种方式来加载配置。本章内容基于 Spring Boot 2.0 进行详解。

1、properties文件;

2、YAML文件;

3、系统环境变量;

4、命令行参数;

等等……

我们可以在 Spring Beans 里面直接使用这些配置文件中加载的值,如:

1、使用 @Value 注解直接注入对应的值,这能获取到 Spring 中 Environment 的值;

2、使用 @ConfigurationProperties 注解把对应的值绑定到一个对象;

3、直接获取注入 Environment 进行获取;

配置属性的方式很多,Spring boot使用了一种独有的 PropertySource 可以很方便的覆盖属性的值。
配置属性加载的顺序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1、开发者工具 `Devtools` 全局配置参数;

2、单元测试上的 `@TestPropertySource` 注解指定的参数;

3、单元测试上的 `@SpringBootTest` 注解指定的参数;

4、命令行指定的参数,如 `java -jar springboot.jar --name="Java技术栈"`;

5、命令行中的 `SPRING_APPLICATION_JSONJSON` 指定参数, 如 `java -Dspring.application.json='{"name":"Java技术栈"}' -jar springboot.jar`

6、`ServletConfig` 初始化参数;

7、`ServletContext` 初始化参数;

8、JNDI参数(如 `java:comp/env/spring.application.json`);

9、Java系统参数(来源:`System.getProperties()`);

10、操作系统环境变量参数;

11、`RandomValuePropertySource` 随机数,仅匹配:`ramdom.*`;

12、JAR包外面的配置文件参数(`application-{profile}.properties(YAML)`)

13、JAR包里面的配置文件参数(`application-{profile}.properties(YAML)`)

14、JAR包外面的配置文件参数(`application.properties(YAML)`)

15、JAR包里面的配置文件参数(`application.properties(YAML)`)

16、`@Configuration`配置文件上 `@PropertySource` 注解加载的参数;

17、默认参数(通过 `SpringApplication.setDefaultProperties` 指定);

数字小的优先级越高,即数字小的会覆盖数字大的参数值,我们来实践下,验证以上配置参数的加载顺序。

1、在主应用程序中添加 Java 系统参数。

1
2
3
4
5
6
@Bean
public CommandLineRunner commandLineRunner() {
return (args) -> {
System.setProperty("name", "javastack-system-properties");
};
}

2、在 application.properties 文件中添加属性。

1
name = javastack-application

3、在 application-dev.properties 文件中添加属性。

1
name = javastack-application-dev

4、添加测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RunWith(SpringRunner.class)
@SpringBootTest(value = { "name=javastack-test", "sex=1" })
@ActiveProfiles("dev")
public class SpringBootBestPracticeApplicationTests {

@Value("${name}")
private String name;

@Test
public void test() {
System.out.println("name is " + name);
}

}

运行 test 单元测试,程序输出:

1
name is javastack-test

根据以上参数动态调整,发现参数会被正确被覆盖。了解了 Spring Boot 各种配置的加载顺序,如果配置被覆盖了我们就知道是什么问题了。