Flink源码解析(一)
2024-12-18 11:49:18 # Flink # 源码解析 # Flink通信基础--Akka组件通信

写在前面

这个关于flink源码的篇章可能会持续更新很久,大概可能会有10多篇左右,对于flink的源码一直想好好整理一下,但是一直断断续续的,碎片化的知识太多,实在是不成体系。有时间应该也会对之前的一些篇章的内容进行重构,文章结构和分类也乱七八糟。flink的源码确实需要好好理解一下,这应该是一个大工程,整体完成可能需要持续一个多月的时间。写给一个多月后的自己,希望能静下心来好好完善这个篇章

Akka组件

主要是jobMaster,ResourceManager和TaskExecutor的通信和设计

flink对Akka进行了封装

在Akka中,Actor之间通信使用消息传递

Akka结构

Akka的Actor属于父Actor,调用getContext().actorOf()创建Actor,创建的这个Actor会到已经存在的树下面的子节点中

所有的Actor都有一个共同的父节点果我们用system.actorOf(…, "someActor")创建一个名为someActor的 Actor,它的引用将包括路径/user/someActor

创建Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ActorHierarchyExperiments {
public static void main(String[] args) throws java.io.IOException {
ActorSystem system = ActorSystem.create("testSystem");

ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor");
System.out.println("First: " + firstRef);
firstRef.tell("printit", ActorRef.noSender());

System.out.println(">>> Press ENTER to exit <<<");
try {
System.in.read();
} finally {
system.terminate();
}
}
}
/**
* 创建Actor
* */
class PrintMyActorRefActor extends AbstractActor {
static Props props() {
return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("printit", p -> {
ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
System.out.println("Second: " + secondRef);
})
.build();
}
}
1
2
3
4

First: Actor[akka://testSystem/user/first-actor#1130048041]
>>> Press ENTER to exit <<<
Second: Actor[akka://testSystem/user/first-actor/second-actor#357662716]

停止Actor

每当一个 Actor 被停止时,它的所有子 Actor 也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏。

停止Actor使用的是调用Actor内部的getContext().stop(getSelf())来实现的

最主要的是prestart方法和prestop方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class StartStopActor1 extends AbstractActor {
static Props props() {
return Props.create(StartStopActor1.class, StartStopActor1::new);
}

@Override
public void preStart() {
System.out.println("first started");
getContext().actorOf(StartStopActor2.props(), "second");
}

@Override
public void postStop() {
System.out.println("first stopped");
}

@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("stop", s -> {
getContext().stop(getSelf());
})
.build();
}
}

public class StartStopActor2 extends AbstractActor {
static Props props() {
return Props.create(StartStopActor2.class, StartStopActor2::new);
}

@Override
public void preStart() {
System.out.println("second started");
}

@Override
public void postStop() {
System.out.println("second stopped");
}

@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}


public class TaskMain {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef first = system.actorOf(StartStopActor1.props(), "first");
first.tell("stop", ActorRef.noSender());
}
}
1
2
3
4
5
6
总结:
first started
second started
second stopped
first stopped

停止一个Actor的时候会先停止子Actor一层一层再到这个Actor上

Actor的失败处理

当一个子的Actor失败之后(也就是抛出一个异常或者出现一个异常的时候)会暂时挂起,将失败信息传到他的父Actor上,然后看看这个父Actor是设置的怎么处理异常的,我们称父Actor就是一个监管者

默认的监管策略是重新启动子Actor

如果不更改默认的话,那么失败的话就会导致重新启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class SupervisingActor extends AbstractActor {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor");
supervisingActor.tell("failChild", ActorRef.noSender());
}

static Props props() {
return Props.create(SupervisingActor.class, SupervisingActor::new);
}

ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor");

@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("failChild", f -> {
child.tell("fail", getSelf());
})
.build();
}
}

class SupervisedActor extends AbstractActor {
static Props props() {
return Props.create(SupervisedActor.class, SupervisedActor::new);
}

@Override
public void preStart() {
System.out.println("supervised actor started");
}

@Override
public void postStop() {
System.out.println("supervised actor stopped");
}

@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("fail", f -> {
System.out.println("supervised actor fails now");
throw new Exception("I failed!");
})
.build();
}
}

1
2
3
4
5
6
7
8
9
10
11

[2022-01-07 11:55:17,219] [INFO] [akka.event.slf4j.Slf4jLogger] [testSystem-akka.actor.default-dispatcher-4] [] - Slf4jLogger started
supervised actor started
supervised actor fails now
supervised actor stopped
supervised actor started
[2022-01-07 11:55:17,469] [ERROR] [akka.actor.OneForOneStrategy] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/supervising-actor/supervised-actor] - I failed!
java.lang.Exception: I failed!

我们看到失败后,被监督的 Actor 停止并立即重新启动。我们还看到一个日志条目,报告处理的异常,是测试异常。我们使用了preStart()和postStop()钩子,这是重启后和重启前默认调用的钩子,因此我们无法区分 Actor 内部是第一次启动还是重启。这通常是正确的做法,重新启动的目的是将 Actor 设置为已知的良好状态,这通常意味着一个干净的开始阶段。实际上,在重新启动时,调用的是preRestart()和postRestart()方法,但如果不重写这两个方法,则默认分别委托给postStop()和preStart()。

Actor的创建

下面写一个简单的创建Actor的模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

public class IotSupervisor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

public static Props props() {
return Props.create(IotSupervisor.class, IotSupervisor::new);
}

@Override
public void preStart() {
log.info("IoT Application started");
}

@Override
public void postStop() {
log.info("IoT Application stopped");
}

// No need to handle any messages
@Override
public Receive createReceive() {
return receiveBuilder()
.build();
}
}

public class IotMain {

public static void main(String[] args) throws IOException {
ActorSystem system = ActorSystem.create("iot-system");

try {
// Create top level supervisor
ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor");

System.out.println("Press ENTER to exit the system");
System.in.read();
} finally {
system.terminate();
}
}
}


发送消息与发送响应

处理每条数据主要是在重写createReceive方法中实现的

下面还有一个示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

final String groupId;

final String deviceId;

public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}

public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, () -> new Device(groupId, deviceId));
}

public static final class RecordTemperature {
final long requestId;
final double value;

public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}

public static final class TemperatureRecorded {
final long requestId;

public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}

public static final class ReadTemperature {
final long requestId;

public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}

public static final class RespondTemperature {
final long requestId;
final Optional<Double> value;

public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
//记录上一次的消息内容
Optional<Double> lastTemperatureReading = Optional.empty();

@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}

@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
//getSender()发送方传递过来的引用

getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.match(RespondTemperature.class,r ->{

log.info("开始");
getSelf().tell("test",getSelf());

})
.build();
}
}


public class Test1 {

public static void main(String[] args) {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
deviceActor.tell(new Device.RecordTemperature(1L, 24.0),deviceActor);
deviceActor.tell(new Device.ReadTemperature(2L), deviceActor);
}

}

这里代码里面deviceActor.tell(new Device.RecordTemperature(1L, 24.0), deviceActor); 这里的内容的第二个参数表示消息的接收者也就是消息的发送目标

这个代码里面通过 deviceActor.tell() 将消息发送到 deviceActor 自己,确保了消息处理顺序的一致性(例如,RecordTemperature 被处理完之后,才会处理 ReadTemperature)。这就是 Akka 的 消息传递顺序 和 并发性 之间的平衡

getSender().tell(new TemperatureRecorded(r.requestId), getSelf());这里的getSender和getSelf

getSender是返回发送消息给当前的actor的actor,getSelf是返回当前的actor,这里的目的是让当前 actor(即 deviceActor)向发送 RecordTemperature 消息的 actor 发送响应(回应)

这里举一个示例的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ActorA
public class ActorA extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
System.out.println("ActorA received message: " + msg);
// ActorA 向 ActorB 发送消息
getContext().actorOf(Props.create(ActorB.class)).tell("Hello from ActorA", getSelf());
})
.build();
}
}

// ActorB
public class ActorB extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
System.out.println("ActorB received message: " + msg);

// 使用 getSender() 和 getSelf() 打印消息
System.out.println("Sender: " + getSender());
System.out.println("Self: " + getSelf());

// ActorB 向 ActorA 发送回复
getSender().tell("Reply from ActorB", getSelf());
})
.build();
}

日志输出如下:

1
2
3
4
ActorA received message: Hello from ActorA
ActorB received message: Hello from ActorA
Sender: Actor[akka://testSystem/user/$a#123456789]
Self: Actor[akka://testSystem/user/$b#987654321]

Akka具有很好的有序性:
在 Akka 中 ,对于一对给定的 Actor,直接从第一个 Actor 发送到第二个 Actor 的消息不会被无序接收。

  • Actor A1 向 A2 发送消息M1、M2和M3。
  • Actor A3 向 A2 发送消息M4、M5和M6。
  • 这意味着,对于 Akka 信息:
    • 如果M1传递,则必须在M2和M3之前传递。
    • 如果M2传递,则必须在M3之前传递。
    • 如果M4传递,则必须在M5和M6之前传递。
    • 如果M5传递,则必须在M6之前传递。
    • A2 可以看到 A1 的消息与 A3 的消息交织在一起。
    • 由于没有保证的传递,任何消息都可能丢失,即不能到达 A2。

flink主要使用的就是Akka的Actor 和Future

小方法的使用

在 Actor 中,使用小方法也是一种很好的做法。建议将消息处理的实际工作委托给方法,而不是在每个lambda中定义具有大量代码的大型ReceiveBuilder。一个结构良好的 Actor 可以是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static class WellStructuredActor extends AbstractActor {

public static class Msg1 {}

public static class Msg2 {}

public static class Msg3 {}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(Msg1.class, this::receiveMsg1)
.match(Msg2.class, this::receiveMsg2)
.match(Msg3.class, this::receiveMsg3)
.build();
}

private void receiveMsg1(Msg1 msg) {
// actual work
}

private void receiveMsg2(Msg2 msg) {
// actual work
}

private void receiveMsg3(Msg3 msg) {
// actual work
}
}

接收超时

ActorContext的setReceiveTimeout方法定义了非活动超时,在该超时之后,将触发发送ReceiveTimeout消息。指定超时时间后,接收函数应该能够处理akka.actor.ReceiveTimeout消息。1毫秒是支持的最小超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static class ReceiveTimeoutActor extends AbstractActor {
public ReceiveTimeoutActor() {
// To set an initial delay
getContext().setReceiveTimeout(Duration.ofSeconds(10));
}

@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"Hello",
s -> {
// To set in a response to a message
getContext().setReceiveTimeout(Duration.ofSeconds(1));
})
.match(
ReceiveTimeout.class,
r -> {
// To turn it off
getContext().cancelReceiveTimeout();
})
.build();
}
}

这里的逻辑就是首先设置了接收超时时间是10s,如果10s没接收到消息就会触发ReceiveTimeout

然后如果接收到Hello消息,就会设置超时1s

如果接收到ReceiveTimeout的消息就会调用 getContext().cancelReceiveTimeout();来取消超时

Actor的调度器

调度器是Akka的和核心,每个ActorSystem都有一个默认的调度器

我们可以在程序中,指定使用的调度器

1
2
ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher"), "myactor3");

这里的withDispatcher就是使用了指定的Dispatcher调度器

调度器有三种类型,

  • Dispatcher:基于事件的调度器,可共享
  • PinnedDispatcher:这个调度器为每个使用它的 Actor 指定唯一的线程;即每个 Actor 将拥有自己的线程池,池中只有一个线程。
  • CallingThreadDispatcher:此调度器仅在当前调用的线程上运行。这个调度器不创建任何新的线程。可共享

这个调度器这里就说到这里吧,因为有点多,这块没仔细的看,需要的时候再去官网上找一下示例资料吧