flow(ExecutorJob)流程

  1. CustomerJourneyExecutorJob.doWork
    根据body转化出customerJourneyContext,然后调用
  2. workflowTrigger.trigger
    调用doTrigger,得到(context.getTriggerId(), processDefId, processVariables)调用
  3. workflowManager.run(内部为acticiti的runtimeService启动流程)返回processInstanceId,生成CustomerJourneyInstance并入库

action的逻辑实现调用部分

看bpmn的xml发现,serviceTask 节点内有activiti:delegateExpression的信息,值为#{TriggerDelegateDispatcher},#{ActionDelegateDispatcher}联想到spring代码中的delegate,查看代码发现抽象BaseDispatcher的实现有ActionDelegateDispatcher和TriggerDelegateDispatcher证实了想法。

ActionDelegateDispatcher的executeDelegate方法根据customerJourneyProperty.getEvent()得到beanname,从而得到AbstractDelegate的具体实现的bean对象,最终执行delegate.executeDelegate方法,实现自实现的逻辑(每个action的实现逻辑即在此进行调用和实现)

1
<?xml version='1.0' encoding='UTF-8'?> <definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/processdef"> <process id="P9fbf8335-9f0a-4c8e-a1bf-a6c4ec856c1a" isExecutable="true"> <startEvent id="START"/> <sequenceFlow sourceRef="START" targetRef="N-8f2e9577-46e5-4833-8cb6-e0622d531496"/> <serviceTask id="N-8f2e9577-46e5-4833-8cb6-e0622d531496" name="微信" activiti:delegateExpression="#{TriggerDelegateDispatcher}"> <extensionElements> <activiti:field name="parameterMap"> <activiti:string><![CDATA[{"triggerHistoryEvent":false,"channelId":"wx1f7fbd28ac009cd0","conditions":"ANY"}]]></activiti:string> </activiti:field> </extensionElements> </serviceTask> <endEvent id="END-N-d3219b76-e9c7-4a5b-b59e-5342c92f8548"/> <sequenceFlow id="END-N-d3219b76-e9c7-4a5b-b59e-5342c92f8548-flow" sourceRef="N-d3219b76-e9c7-4a5b-b59e-5342c92f8548" targetRef="END-N-d3219b76-e9c7-4a5b-b59e-5342c92f8548"/> <serviceTask id="N-d3219b76-e9c7-4a5b-b59e-5342c92f8548" name="微信" activiti:delegateExpression="#{ActionDelegateDispatcher}"> <extensionElements> <activiti:field name="parameterMap"> <activiti:string><![CDATA[{"triggerHistoryEvent":false,"channelId":"wx1f7fbd28ac009cd0","replyMessage":{"touser":"#{OPENID}","text":{"content":"近期活动 Upcoming events:\n\n2018/11/28 <a href=\" \" data-miniprogram-appid=\"wxf1d37b3137043a00\">报名 Register</a >\n荟同开讲 | 双语教育的模型与实践\nWhittle Talks | Bilingual Models and Practices\n\n2018/12/08 <a href=\"pages/order/order?event_id=82\" data-miniprogram-appid=\"wxf1d37b3137043a00\">报名 Register</a >\n荟同开讲 | 小童书的大说法\nWhittle Talks | The big story behind a book for small children\n\n更多精彩活动,请<a href=\"pages/calendar/calendar\" data-miniprogram-appid=\"wxf1d37b3137043a00\">查看活动日历</a >\n\nFind out more in our <a href=\"pages/calendar/calendar\" data-miniprogram-appid=\"wxf1d37b3137043a00\"> Event Calendar</a >"}},"resultVariable":"gatewayc58"}]]></activiti:string> </activiti:field> </extensionElements> </serviceTask> <sequenceFlow id="N-7f0dcb94-be72-4999-84dc-256fa65d9fd9" sourceRef="N-8f2e9577-46e5-4833-8cb6-e0622d531496" targetRef="N-d3219b76-e9c7-4a5b-b59e-5342c92f8548"/> </process> <bpmndi:BPMNDiagram id="BPMNDiagram_P9fbf8335-9f0a-4c8e-a1bf-a6c4ec856c1a"> <bpmndi:BPMNPlane bpmnElement="P9fbf8335-9f0a-4c8e-a1bf-a6c4ec856c1a" id="BPMNPlane_P9fbf8335-9f0a-4c8e-a1bf-a6c4ec856c1a"/> </bpmndi:BPMNDiagram> </definitions>

当 forEach 需要索引

Java8:当 forEach 需要索引

上一篇文章 中,我们讨论了如何使用 Java8 中 Map 添加的新方法 computeIfAbsent 来统计集合中每个元素出现的所有位置,代码如下:

1
2
3
4
5
6
7
8
9
public static Map<String, List<Integer>> getElementPositions(List<String> list) {
Map<String, List<Integer>> positionsMap = new HashMap<>();

for (int i = 0; i < list.size(); i++) {
positionsMap.computeIfAbsent(list.get(i), k -> new ArrayList<>(1)).add(i);
}

return positionsMap;
}

至少有两点需要探讨:
1、如果 list 不是基于数组的(即不是 RandomAccess 的),而是基于链表的,那么 list.get(int index) 方法的效率就值得思考了;
2、既然都有了 Lambda(即当前平台为 Java8),我们为什么还要一次次去写传统的 for 循环呢?

在 Java8 中,为 Iterable 接口添加了默认的 forEach 方法:

1

很好理解,遍历当前 Iterable 中所有的元素,使用每个元素作为参数调用一次 action。而 Collection 接口继承了 Iterable 接口,所以所有的继承自 Collection 的集合类都可以直接调用 forEach 方法。比如:

1
2
3
4
5
6
7
public static void main(String[] args) throws Exception {
List<String> list = Arrays.asList("a", "b", "b", "c", "c", "c", "d", "d", "d", "f", "f", "g");

list.forEach(str -> System.out.print(str + " "));

System.out.println();
}

运行结果:
2

forEach 运行示例

那如果我们在遍历的时候需要使用到元素的索引呢(类似 getElementPositions 方法那样)?
很可惜,Java8 的 Iterable 并没有提供一个带索引的 forEach 方法。不过自己动手,丰衣足食 —— 让我们自己写一个带索引的 forEach 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.Objects;
import java.util.function.BiConsumer;

/**
* Iterable 的工具类
*/
public class Iterables {

public static <E> void forEach(
Iterable<? extends E> elements, BiConsumer<Integer, ? super E> action) {
Objects.requireNonNull(elements);
Objects.requireNonNull(action);

int index = 0;
for (E element : elements) {
action.accept(index++, element);
}
}
}

forEach 方法第一个参数为要遍历的 Iterable,第二个参数为 BiConsumerBiConsumer 的输入参数第一个即索引,第二个为元素。

我们测试下这个 forEach 方法:

1
2
3
4
5
6
public static void main(String[] args) throws Exception {

List<String> list = Arrays.asList("a", "b", "b", "c", "c", "c", "d", "d", "d", "f", "f", "g");

Iterables.forEach(list, (index, str) -> System.out.println(index + " -> " + str));
}

运行结果:
3

结果和预期的一致。

现在我们使用 Iterables.forEach 改写 getElementPositions 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static Map<String, List<Integer>> getElementPositions(List<String> list) {
Map<String, List<Integer>> positionsMap = new HashMap<>();

Iterables.forEach(list, (index, str) -> {
positionsMap.computeIfAbsent(str, k -> new ArrayList<>(1)).add(index);
});

return positionsMap;
}

public static void main(String[] args) throws Exception {
List<String> list = Arrays.asList("a", "b", "b", "c", "c", "c", "d", "d", "d", "f", "f", "g");

System.out.println("使用 computeIfAbsent 和 Iterable.forEach:");
Map<String, List<Integer>> elementPositions = getElementPositions(list);
System.out.println(elementPositions);
}

运行结果和原来一致:
4

使用 computeIfAbsent 和 Iterable.forEach 的运行结果

真的不明白这么简单且实用的方法,Java8 为什么不在 Iterable 中提供一个默认实现(此处应有黑人问号)。

subList

List接口中定义:

1
List<E> subList(int fromIndex, int toIndex);

英文注释:

1
2
3
4
5
6
7
8
9
10
11
12
Returns a view of the portion of this list between the specified fromIndex, inclusive, and toIndex, exclusive. (If fromIndex and toIndex are equal, the 
returned list is empty.) The returned list is backed by this list, so non-structural changes in the returned list are reflected in this list, and vice-versa.
The returned list supports all of the optional list operations supported by this list.

This method eliminates the need for explicit range operations (of the sort that commonly exist for arrays). Any operation that expects a list can be used as a
range operation by passing a subList view instead of a whole list. For example, the following idiom removes a range of elements from a list:
list.subList(from, to).clear();

Similar idioms may be constructed for indexOf and lastIndexOf, and all of the algorithms in the Collections class can be applied to a subList.
The semantics of the list returned by this method become undefined if the backing list (i.e., this list) is structurally modified in any way other than via
the returned list. (Structural modifications are those that change the size of this list, or otherwise perturb it in such a fashion that iterations in
progress may yield incorrect results.)

根据注释得知:

1,该方法返回的是父list的一个视图,从fromIndex(包含),到toIndex(不包含)。fromIndex=toIndex 表示子list为空

2,父子list做的非结构性修改(non-structural changes)都会影响到彼此:所谓的“非结构性修改”,是指不涉及到list的大小改变的修改。相反,结构性修改,指改变了list大小的修改。

3,对于结构性修改,子list的所有操作都会反映到父list上。但父list的修改将会导致返回的子list失效。

4,tips:如何删除list中的某段数据:

list.subList(from, to).clear();

示例代码:

来自【Java每日一题】20170105,就是看到这个题目才让我知道list的这个方法我没有接触过

1
2
3
4
5
6
7
8
9
10
11
package ques; import java.util.ArrayList; import java.util.List; public class Ques0105 { public static void main(String[] args) {  
List<String> list = new ArrayList<String>();
list.add("a"); // 使用构造器创建一个包含list的列表list1
List<String> list1 = new ArrayList<String>(list); // 使用subList生成与list相同的列表list2
List<String> list2 = list.subList(0, list.size());
list2.add("b");

System.out.println(list.equals(list1));
System.out.println(list.equals(list2));
}
}

返回结果如下:
1

可以发现,list2为list的子list,当list2发生结构性修改(list2.add(“b”))后,list也发生相应改变,所以返回结果为false和true

通配符和边界

<? extends T><? super T>是Java泛型中的“通配符(Wildcards)”“边界(Bounds)”的概念。

  • <? extends T>:是指 “上界通配符(Upper Bounds Wildcards)”
  • <? super T>:是指 “下界通配符(Lower Bounds Wildcards)”

为什么要用通配符和边界?

使用泛型的过程中,经常出现一种很别扭的情况。比如按照题主的例子,我们有Fruit类,和它的派生类Apple类。

class Fruit {} class Apple extends Fruit {}

然后有一个最简单的容器:Plate类。盘子里可以放一个泛型的“东西”。我们可以对这个东西做最简单的“放”和“取”的动作:set( )和get( )方法。

1
2
3
4
5
6
class Plate<T>{ 
private T item;
public Plate(T t){item=t;}
public void set(T t){item=t;}
public T get(){return item;}
}

现在我定义一个“水果盘子”,逻辑上水果盘子当然可以装苹果。

1
Plate<Fruit> p=new Plate<Apple>(new Apple());

但实际上Java编译器不允许这个操作。会报错,“装苹果的盘子”无法转换成“装水果的盘子”。

1
error: incompatible types: Plate<Apple> cannot be converted to Plate<Fruit>

所以我的尴尬症就犯了。实际上,编译器脑袋里认定的逻辑是这样的:

  • 苹果 IS-A 水果
  • 装苹果的盘子 NOT-IS-A 装水果的盘子

所以,就算容器里装的东西之间有继承关系,但容器之间是没有继承关系的。所以我们不可以把Plate的引用传递给Plate。

为了让泛型用起来更舒服,Sun的大脑袋们就想出了<? extends T><? super T>的办法,来让”水果盘子“和”苹果盘子“之间发生关系。

什么是上界?

下面代码就是“上界通配符(Upper Bounds Wildcards)”:

Plate<? extends Fruit>

翻译成人话就是:一个能放水果以及一切是水果派生类的盘子。再直白点就是:啥水果都能放的盘子。这和我们人类的逻辑就比较接近了。Plate<? extends Fruit>Plate<Apple>最大的区别就是:Plate<? extends Fruit>Plate<Fruit>以及Plate<Apple>的基类。直接的好处就是,我们可以用“苹果盘子”给“水果盘子”赋值了。

Plate<? extends Fruit> p=new Plate(new Apple());

如果把Fruit和Apple的例子再扩展一下,食物分成水果和肉类,水果有苹果和香蕉,肉类有猪肉和牛肉,苹果还有两种青苹果和红苹果。

1
2
3
4
5
6
7
8
9
10
11
12
13
//Lev 1
class Food{}
//Lev 2
class Fruit extends Food{}
class Meat extends Food{}
//Lev 3
class Apple extends Fruit{}
class Banana extends Fruit{}
class Pork extends Meat{}
class Beef extends Meat{}
//Lev 4
class RedApple extends Apple{}
class GreenApple extends Apple{}

在这个体系中,下界通配符 Plate<? extends Fruit> 覆盖下图中蓝色的区域。

1

什么是下界?

相对应的,“下界通配符(Lower Bounds Wildcards)”:

Plate<? super Fruit>

表达的就是相反的概念:一个能放水果以及一切是水果基类的盘子。Plate<? super Fruit>Plate<Fruit>的基类,但不是Plate<Apple>的基类。对应刚才那个例子,Plate<? super Fruit>覆盖下图中红色的区域。

2

上下界通配符的副作用

边界让Java不同泛型之间的转换更容易了。但不要忘记,这样的转换也有一定的副作用。那就是容器的部分功能可能失效。

还是以刚才的Plate为例。我们可以对盘子做两件事,往盘子里set()新东西,以及从盘子里get()东西。

class Plate{ private T item; public Plate(T t){item=t;} public void set(T t){item=t;} public T get(){return item;}
}

上界<? extends T>不能往里存,只能往外取

<? extends Fruit>会使往盘子里放东西的set( )方法失效。但取东西get( )方法还有效。比如下面例子里两个set()方法,插入Apple和Fruit都报错。

1
2
3
4
5
6
Plate<? extends Fruit> p=new Plate<Apple>(new Apple()); //不能存入任何元素
p.set(new Fruit()); //Error
p.set(new Apple()); //Error //读取出来的东西只能存放在Fruit或它的基类里。
Fruit newFruit1=p.get();
Object newFruit2=p.get();
Apple newFruit3=p.get(); //Error

原因是编译器只知道容器内是Fruit或者它的派生类,但具体是什么类型不知道。可能是Fruit?可能是Apple?也可能是Banana,RedApple,GreenApple?编译器在看到后面用Plate赋值以后,盘子里没有被标上有“苹果”。而是标上一个占位符:CAP#1,来表示捕获一个Fruit或Fruit的子类,具体是什么类不知道,代号CAP#1。然后无论是想往里插入Apple或者Meat或者Fruit编译器都不知道能不能和这个CAP#1匹配,所以就都不允许。

所以通配符<?>和类型参数的区别就在于,对编译器来说所有的T都代表同一种类型。比如下面这个泛型方法里,三个T都指代同一个类型,要么都是String,要么都是Integer。

public List fill(T… t);

但通配符<?>没有这种约束,Plate<?>单纯的就表示:盘子里放了一个东西,是什么我不知道。

所以题主问题里的错误就在这里,Plate<? extends Fruit>里什么都放不进去。

下界<? super T>不影响往里存,但往外取只能放在Object对象里

使用下界<? super Fruit>会使从盘子里取东西的get( )方法部分失效,只能存放到Object对象里。set( )方法正常。

1
2
3
4
5
6
Plate<? super Fruit> p=new Plate<Fruit>(new Fruit()); //存入元素正常
p.set(new Fruit());
p.set(new Apple()); //读取出来的东西只能存放在Object类里。
Apple newFruit3=p.get(); //Error
Fruit newFruit1=p.get(); //Error
Object newFruit2=p.get();

因为下界规定了元素的最小粒度的下限,实际上是放松了容器元素的类型控制。既然元素是Fruit的基类,那往里存粒度比Fruit小的都可以。但往外读取元素就费劲了,只有所有类的基类Object对象才能装下。但这样的话,元素的类型信息就全部丢失。

PECS原则

最后看一下什么是PECS(Producer Extends Consumer Super)原则,已经很好理解了:

  • 频繁往外读取内容的,适合用上界Extends。
  • 经常往里插入的,适合用下界Super。

Java8(forEach)

java中的集合有两种形式Collection,Map<K,V>

Collection类型集合
在JAVA7中遍历有一下几种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<String> list = Arrays.asList("aaaa","bbbb");
//for循环
for (int i=0;i< list.size();i++){
System.out.println(list.get(i));
}

//增强for循环
for(String str : list){
System.out.println(str);
}
//迭代器
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}

在JAVA8中还可以使用forEach来遍历:

1
2
3
4
5
6
list.forEach(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});

发现和使用sort排序的写法是一样的:

1
2
3
4
5
6
Collections.sort(list, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
});

发现代码量没有减少多少,那么我们在加上Lambda表达式,如下:

1
list.forEach(str-> System.out.println(str));

sort排序也使用Lambda表达式,如下:

1
Collections.sort(list,(o1,o2)->{return  o1.compareTo(o2);});

Map类型集合
在JAVA7中遍历有下面几种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Map<String,String> map = new HashMap<>();
map.put("id","11");
map.put("name","zhangsan");
map.put("age","30");
for (Map.Entry<String,String> entry : map.entrySet()){
System.out.println("k=" + entry.getKey() + ",v=" + entry.getValue());
}
for (String key : map.keySet()){
System.out.println("k=" + key + ",v=" + map.get(key));
}
Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, String> entry = iterator.next();
System.out.println("k=" + entry.getKey() + ",v=" + entry.getValue());
}
for (String v : map.values()){
System.out.println("v=" + v);
}

使用JAVA8的forEach遍历,如下:

1
2
3
4
5
6
map.forEach(new BiConsumer<String, String>() {
@Override
public void accept(String s, String s2) {
System.out.println("k=" + s + ",v=" + s2);
}
});

也可以使用Lambda表达式,如下:

1
map.forEach((k,v)-> System.out.println("v="+k+",v="+v));

本文中只使用forEach,没有使用JAVA8的Stream包。
注:例子中出现的Consumer和BiConsumer是JAVA8中新增的函数式接口,Consumer接口中只有一个无返回结果并且只有一个泛型参数的方法,源码如下:
1

BiConsumer接口中提供的是一个无返回结果并且有两个泛型参数的方法,源码如下:
2

拦截器和过滤器

GenericFilterBean
https://blog.csdn.net/liangxw1/article/details/51095484
RequestInterceptor
https://www.jianshu.com/p/919d066a07aa

Servlet中提供了8个监听器

==一类==:监听三个域对象的创建和销毁的监听器
对象类型 对应的监听器
ServletContext ServletContextListener
HttpSession HttpSessionListener
HttpServletRequest ServletRequestListener

==二类==:监听三个域对象的属性变更的监听器.(属性添加,属性移除,属性替换)
对象类型 对应的监听器
ServletContext ServletContextAttributeListener
HttpServletRequest ServletRequestAttributeListener
HttpSession HttpSessionAttributeListener

==三类==:监听HttpSession对象中的JavaBean的状态的改变.(绑定,解除绑定,钝化和活化)2个
对象类型 对应的监听器
HttpSession HttpSessionBindingListener(绑定,解除绑定)
HttpSession HttpSessionActivationListener(钝化和活化)

insert和replace相关

insert ignore into 和insert into

INSERT IGNORE 与INSERT INTO的区别就是INSERT IGNORE会忽略数据库中已经存在 的数据,如果数据库没有数据,就插入新的数据,如果有数据的话就跳过这条数据。这样就可以保留数据库中已经存在数据,达到在间隙中插入数据的目的。

replace into

replace into表示插入替换数据,需求表中有PrimaryKey,或者unique索引的话,如果数据库已经存在数据,则用新数据替换,如果没有数据效果则和insert into一样;
REPLACE语句会返回一个数,来指示受影响的行的数目。该数是被删除和被插入的行数的和。如果对于一个单行REPLACE该数为1,则一行被插入,同时没有行被删除。如果该数大于1,则在新行被插入前,有一个或多个旧行被删除。如果表包含多个唯一索引,并且新行复制了在不同的唯一索引中的不同旧行的值,则有可能是一个单一行替换了多个旧行。

insert into … on duplicate key update

insert语句的末尾添加on duplicate key update语法:如果插入行出现唯一索引或者主键重复时,则执行旧的update;如果不会导致唯一索引或者主键重复时,就直接添加新行。
https://blog.csdn.net/qq_41070393/article/details/82422632
附加条件:
https://blog.csdn.net/hellozhxy/article/details/80945553

Sha1加密和HmacSHA1相关

1
2
3
4
5
6
7
8
9
10
11
Sha1加密:

private static final char[] _HEX_DIGITS_ = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};

private static String getFormattedText(byte[] bytes) { int len = bytes.length; StringBuilder buf = new StringBuilder(len * 2); // 把密文转换成十六进制的字符串形式 for (int j = 0; j < len; j++) { buf.append(_HEX_DIGITS_[(bytes[j] >> 4) & 0x0f]); buf.append(_HEX_DIGITS_[bytes[j] & 0x0f]); } return buf.toString(); }

public String payload(@RequestBody String body, HttpServletRequest request) {

MessageDigest sha1 = MessageDigest.getInstance("SHA-1"); String secret = "123321Zh"; sha1.update(secret.getBytes("utf-8")); byte[] digest = sha1.digest(); String result = _getFormattedText_(digest);

}

Java HmacSHA1算法:(HMAC(散列消息身份验证码: Hashed Message Authentication Code))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static String hmacSha1(String src, String key) {

        try {

SecretKeySpec signingKey = new SecretKeySpec(key.getBytes("utf-8"), "HmacSHA1");

            Mac mac = Mac.getInstance("HmacSHA1");

            mac.init(signingKey);

            byte[] rawHmac = mac.doFinal(src.getBytes("utf-8"));

            return Hex.encodeHexString(rawHmac);

} catch (Exception e) {

            throw new RuntimeException(e);

        }

    }

0315的总结

–1.save和saveandflush
save不会立刻提交到数据库,flush则立刻提交生效,save可能只是修改在内存中的
–2.webhook
3.HMAC,SHA1
–4.ngrok
https://tonybai.com/2015/05/14/ngrok-source-intro/
–5.node vue
–6.JodaTimeConverter

–7.jackson

–8.jdbctemplate namedParameterJdbcTemplate
https://segmentfault.com/a/1190000010907688
–9.rabbit(exchange type,routingkey,queue,channel)
–0.zookeeper的配置属性,集群(hostname)
11.npm i和npm install的区别
https://blog.csdn.net/chern1992/article/details/79193211
–12.搭建hexo博客相关
https://blog.csdn.net/sinat_37781304/article/details/82729029
13.阿里巴巴为什么不用 ZooKeeper 做服务发现?
https://yq.aliyun.com/articles/599997
14.what are webhooks?
https://zapier.com/blog/what-are-webhooks/
You’re only limited by your imagination.

Zookeeper的搭建

一、Zookeeper的搭建方式

Zookeeper安装方式有三种,单机模式集群模式以及伪集群模式

■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境;
■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例;
■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble)

Zookeeper通过复制来实现高可用性,只要集合体中半数以上的机器处于可用状态,它就能够保证服务继续。为什么一定要超过半数呢?这跟Zookeeper的复制策略有关:zookeeper确保对znode 树的每一个修改都会被复制到集合体中超过半数的机器上。

1.1 Zookeeper的单机模式搭建

下载ZooKeeper

解压:tar -zxvf zookeeper-3.4.5.tar.gz 重命名:mv zookeeper-3.4.5 zk

配置文件:在conf目录下删除zoo_sample.cfg文件,创建一个配置文件zoo.cfg。

tickTime=2000
dataDir=/usr/local/zk/data
dataLogDir=/usr/local/zk/dataLog
clientPort=2181

配置环境变量:为了今后操作方便,我们需要对Zookeeper的环境变量进行配置,方法如下在/etc/profile文件中加入如下内容:

export ZOOKEEPER_HOME=/usr/local/zk
export PATH=.:$HADOOP_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH

启动ZooKeeper的Server:zkServer.sh start;
关闭ZooKeeper的Server:zkServer.sh stop

1.2 Zookeeper的伪集群模式搭建

Zookeeper不但可以在单机上运行单机模式Zookeeper,而且可以在单机模拟集群模式 Zookeeper的运行,也就是将不同节点运行在同一台机器。我们知道伪分布模式下Hadoop的操作和分布式模式下有着很大的不同,但是在集群为分布 式模式下对Zookeeper的操作却和集群模式下没有本质的区别。显然,集群伪分布式模式为我们体验Zookeeper和做一些尝试性的实验提供了很大 的便利。比如,我们在实验的时候,可以先使用少量数据在集群伪分布模式下进行测试。当测试可行的时候,再将数据移植到集群模式进行真实的数据实验。这样不 但保证了它的可行性,同时大大提高了实验的效率。这种搭建方式,比较简便,成本比较低,适合测试和学习,如果你的手头机器不足,就可以在一台机器上部署了 3个server。

1.2.1. 注意事项

在一台机器上部署了3个server,需要注意的是在集群为分布式模式下我们使用的每个配置文档模拟一台机器,也就是说单台机器及上运行多个Zookeeper实例。但是,必须保证每个配置文档的各个端口号不能冲突,除了clientPort不同之外,dataDir也不同。另外,还要在dataDir所对应的目录中创建myid文件来指定对应的Zookeeper服务器实例。

■ clientPort端口:如果在1台机器上部署多个server,那么每台机器都要不同的 clientPort,比如 server1是2181,server2是2182,server3是2183

■ dataDir和dataLogDir:dataDir和dataLogDir也需要区分下,将数据文件和日志文件分开存放,同时每个server的这两变量所对应的路径都是不同的

■ server.X和myid: server.X 这个数字就是对应,data/myid中的数字。在3个server的myid文件中分别写入了0,1,2,那么每个server中的zoo.cfg都配 server.0 server.2,server.3就行了。因为在同一台机器上,后面连着的2个端口,3个server都不要一样,否则端口冲突

下面是我所配置的集群伪分布模式,分别通过zoo1.cfg、zoo2.cfg、zoo3.cfg来模拟由三台机器的Zookeeper集群,
代码清单 zoo1.cfg如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
dataDir=/usr/local/zk/data_1

# the port at which the clients will connect
clientPort=2181

#the location of the log file
dataLogDir=/usr/local/zk/logs_1

server.0=localhost:2287:3387
server.1=localhost:2288:3388
server.2=localhost:2289:3389

代码清单 zoo2.cfg如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
dataDir=/usr/local/zk/data_2

# the port at which the clients will connect
clientPort=2182

#the location of the log file
dataLogDir=/usr/local/zk/logs_2

server.0=localhost:2287:3387
server.1=localhost:2288:3388
server.2=localhost:2289:3389

代码清单 zoo3.cfg如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
dataDir=/usr/local/zk/data_3

# the port at which the clients will connect
clientPort=2183

#the location of the log file
dataLogDir=/usr/local/zk/logs_3

server.0=localhost:2287:3387
server.1=localhost:2288:3388
server.2=localhost:2289:3389

1.2.2 启动

在集群为分布式下,我们只有一台机器,按时要运行三个Zookeeper实例。此时,如果在使用单机模式的启动命令是行不通的。此时,只要通过下面三条命令就能运行前面所配置的Zookeeper服务。如下所示:

zkServer.sh start zoo1.sh
zkServer.sh start zoo2.sh
zkServer.sh start zoo3.sh

启动过程,如下图所示:

1

启动结果,如下图所示:

2

在运行完第一条指令之后,会出现一些错误异常,产生异常信息的原因是由于Zookeeper 服务的每个实例都拥有全局配置信息,他们在启动的时候会随时随地的进行Leader选举操作。此时,第一个启动的Zookeeper需要和另外两个 Zookeeper实例进行通信。但是,另外两个Zookeeper实例还没有启动起来,因此就产生了这的异样信息。我们直接将其忽略即可,待把图中“2 号”和“3号”Zookeeper实例启动起来之后,相应的异常信息自然会消失。此时,可以通过下面三条命令,来查询。

zkServer.sh status zoo1.cfg
zkServer.sh status zoo2.cfg
zkServer.sh status zoo3.cfg

Zookeeper服务的运行状态,如下图所示:

3

1.3 Zookeeper的集群模式搭建

为了获得可靠地Zookeeper服务,用户应该在一个机群上部署Zookeeper。只要机群上大多数的Zookeeper服务启动了,那么总的 Zookeeper服务将是可用的。集群的配置方式,和前两种类似,同样需要进行环境变量的配置。在每台机器上conf/zoo.cf配置文件的参数设置 相同

1.3.1 创建myid

在dataDir(/usr/local/zk/data)目录创建myid文件

Server0机器的内容为:0
Server1机器的内容为:1
Server2机器的内容为:2

1.3.2 编写配置文件

在conf目录下删除zoo_sample.cfg文件,创建一个配置文件zoo.cfg,如下所示,代码清单 zoo.cfg中的参数设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
dataDir=/usr/local/zk/data

# the port at which the clients will connect
clientPort=2183

#the location of the log file
dataLogDir=/usr/local/zk/log

server.0=hadoop:2288:3388
server.1=hadoop0:2288:3388
server.2=hadoop1:2288:3388

1.3.3 启动

分别在3台机器上启动ZooKeeper的Server:zkServer.sh start;

二、Zookeeper的配置

Zookeeper的功能特性是通过Zookeeper配置文件来进行控制管理的(zoo.cfg).这样的设计其实有其自身的原因,通过前面对Zookeeper的配置可以看出,在对Zookeeper集群进行配置的时候,它的配置文档是完全相同的。集群伪分布模式中,有少部分是不同的。这样的配置方式使得在部署Zookeeper服务的时候非常方便。如果服务器使用不同的配置文件,必须确保不同配置文件中的服务器列表相匹配。

在设置Zookeeper配置文档时候,某些参数是可选的,某些是必须的。这些必须参数就构成了Zookeeper配置文档的最低配置要求。另外,若要对Zookeeper进行更详细的配置,可以参考下面的内容。

2.1 基本配置

下面是在最低配置要求中必须配置的参数:

(1) client:监听客户端连接的端口。
(2) tickTime:基本事件单元,这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳;最小 的session过期时间为2倍tickTime   
dataDir:存储内存中数据库快照的位置,如果不设置参数,更新食物的日志将被存储到默认位置。

应该谨慎的选择日志存放的位置,使用专用的日志存储设备能够大大提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会很大程度上影像系统性能。

2.2 高级配置

下面是高级配置参数中可选配置参数,用户可以使用下面的参数来更好的规定Zookeeper的行为:

(1) dataLogdDir

这个操作让管理机器把事务日志写入“dataLogDir”所指定的目录中,而不是“dataDir”所指定的目录。这将允许使用一个专用的日志设备,帮助我们避免日志和快照的竞争。配置如下:

1
2
# the directory where the snapshot is stored
dataDir=/usr/local/zk/data

(2)maxClientCnxns

这个操作将限制连接到Zookeeper的客户端数量,并限制并发连接的数量,通过IP来区分不同的客户端。此配置选项可以阻止某些类别的Dos攻击。将他设置为零或忽略不进行设置将会取消对并发连接的限制。

例如,此时我们将maxClientCnxns的值设为1,如下所示:

1
2
# set maxClientCnxns
maxClientCnxns=1

启动Zookeeper之后,首先用一个客户端连接到Zookeeper服务器上。之后如果有第二个客户端尝试对Zookeeper进行连接,或者有某些隐式的对客户端的连接操作,将会触发Zookeeper的上述配置。

(3)minSessionTimeout和maxSessionTimeout

即最小的会话超时和最大的会话超时时间。在默认情况下,minSession=2tickTime;maxSession=20tickTime。

2.3 集群配置

(1) initLimit

此配置表示,允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。

(2) syncLimit

此配置项表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。

(3) server.A=B:C:D

A:其中 A 是一个数字,表示这个是服务器的编号;
B:是这个服务器的 ip 地址;
C:Leader选举的端口;
D:Zookeeper服务器之间的通信端口。

(4) myid 和 zoo.cfg

除了修改 zoo.cfg 配置文件,集群模式下还要配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面就有一个数据就是 A 的值,Zookeeper 启动时会读取这个文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是那个 server。

三、搭建ZooKeeper服务器集群

搭建要求:

(1) zk服务器集群规模不小于3个节点
(2) 要求各服务器之间系统时间要保持一致。

3.1 安装配置ZK

(1) 使用WinScp将Zk传输到Hadoop主机上的/usr/local,我用的版本是zookeeper-3.4.5.tar.gz。

(2) 在hadoop的/usr/local目录下,解压缩zk….tar.gz,设置环境变量

解压缩: 在/usr/local目录下,执行命令:tar -zxvf zookeeper-3.4.5.tar.gz,如下图所示:
4

重命名:解压后将文件夹,重命名为zk,执行命令: mv zookeeper-3.4.5 zk,如下图所示:
5

设置环境变量:执行命令: vi /etc/profile ,添加 :export ZOOKEEPER_HOME=/usr/local/zk,如图2.3所示的内容。执行命令:source /etc/profile 如下图所示:

6

15

3.2 修改ZK配置文件

(1) 重命名:将/usr/local/zk/conf目录下zoo_sample.cfg,重命名为zoo.cfg,执行命令:mv zoo_sample.cfg zoo.cfg。如如下图所示:
7

(2) 查看:在/usr/local/zk/conf目录下,修改文件 vi zoo.cfg,文件内容如下图所示。在该文件中dataDir表示文件存放目录,它的默认设置为/tmp/zookeeper这是一个临时存放目录,每 次重启后会丢失,在这我们自己设一个目录,/usr/local/zk/data。

8

(3) 创建文件夹:mkdir /usr/local/zk/data

(4) 创建myid:在data目录下,创建文件myid,值为0;vi myid ;内容为0。

(5) 编辑:编辑该文件,执行vi zoo.cfg ,修改dataDir=/usr/local/zk/data。

新增

server.0=hadoop:2888:3888
server.1=hadoop0:2888:3888
server.2=hadoop1:2888:3888

tickTime :这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳;

dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里;

clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

当这些配置项配置好后,就可以启动 Zookeeper 了,启动后使用命令echo ruok | nc localhost 2181检查 Zookeeper 是否已经在服务。

2.3 配置其他节点

(1) 把haooop主机的zk目录和/etc/profile目录,复制到hadoop0和hadoop1中。执行命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scp -r /usr/local/zk/ hadoop0:/usr/local/
scp -r /usr/local/zk/ hadoop1:/usr/local/
      scp /etc/profile hadoop0:/etc/
      scp /etc/profile hadoop1:/etc/

      ssh hadoop0
      suorce /etc/profile
      vi /usr/local/zk/data/myid
      exit

      ssh hadoop1
      suorce /etc/profile
      vi /usr/local/zk/data/myid
      exit

(2) 把hadoop1中相应的myid的值改为1,把hadoop2中相应的myid的值改为2。   

四、启动检验

(1) 启动,在三个节点上分别执行命令zkServer.sh start

hadoop节点

9

hadoop0节点

10

hadoop1节点

11

(2) 检验,在三个节点上分别执行命令zkServer.sh status,从下面的图中我们会发现hadoop和hadoop1为Follower,hadoop0为Leader。

hadoop节点

12

hadoop0节点

13

hadoop1节点

14