写在前面 这个关于flink源码的篇章可能会持续更新很久,大概可能会有10多篇左右,对于flink的源码一直想好好整理一下,但是一直断断续续的,碎片化的知识太多,实在是不成体系。有时间应该也会对之前的一些篇章的内容进行重构,文章结构和分类也乱七八糟。flink的源码确实需要好好理解一下,这应该是一个大工程,整体完成可能需要持续一个多月的时间。写给一个多月后的自己,希望能静下心来好好完善这个篇章
Akka组件 主要是jobMaster,ResourceManager和TaskExecutor的通信和设计
flink对Akka进行了封装
在Akka中,Actor之间通信使用消息传递
Akka结构
Akka的Actor属于父Actor,调用getContext().actorOf()
创建Actor,创建的这个Actor会到已经存在的树下面的子节点中
所有的Actor都有一个共同的父节点果我们用system.actorOf(…, "someActor")
创建一个名为someActo
r的 Actor,它的引用将包括路径/user/someActor
创建Actor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class ActorHierarchyExperiments { public static void main (String[] args) throws java.io.IOException { ActorSystem system = ActorSystem.create("testSystem" ); ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor" ); System.out.println("First: " + firstRef); firstRef.tell("printit" , ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<" ); try { System.in.read(); } finally { system.terminate(); } } } class PrintMyActorRefActor extends AbstractActor { static Props props () { return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new ); } @Override public Receive createReceive () { return receiveBuilder() .matchEquals("printit" , p -> { ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor" ); System.out.println("Second: " + secondRef); }) .build(); } }
1 2 3 4 First: Actor[akka://testSystem/user/first-actor#1130048041] >>> Press ENTER to exit <<< Second: Actor[akka://testSystem/user/first-actor/second-actor#357662716]
停止Actor 每当一个 Actor 被停止时,它的所有子 Actor 也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏。
停止Actor使用的是调用Actor内部的getContext().stop(getSelf())
来实现的
最主要的是prestart方法和prestop方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class StartStopActor1 extends AbstractActor { static Props props () { return Props.create(StartStopActor1.class, StartStopActor1::new ); } @Override public void preStart () { System.out.println("first started" ); getContext().actorOf(StartStopActor2.props(), "second" ); } @Override public void postStop () { System.out.println("first stopped" ); } @Override public Receive createReceive () { return receiveBuilder() .matchEquals("stop" , s -> { getContext().stop(getSelf()); }) .build(); } } public class StartStopActor2 extends AbstractActor { static Props props () { return Props.create(StartStopActor2.class, StartStopActor2::new ); } @Override public void preStart () { System.out.println("second started" ); } @Override public void postStop () { System.out.println("second stopped" ); } @Override public Receive createReceive () { return receiveBuilder().build(); } } public class TaskMain { public static void main (String[] args) { ActorSystem system = ActorSystem.create("testSystem" ); ActorRef first = system.actorOf(StartStopActor1.props(), "first" ); first.tell("stop" , ActorRef.noSender()); } }
1 2 3 4 5 6 总结: first started second started second stopped first stopped
停止一个Actor的时候会先停止子Actor一层一层再到这个Actor上
Actor的失败处理 当一个子的Actor失败之后(也就是抛出一个异常或者出现一个异常的时候)会暂时挂起,将失败信息传到他的父Actor上,然后看看这个父Actor是设置的怎么处理异常的,我们称父Actor就是一个监管者
默认的监管策略是重新启动子Actor
如果不更改默认的话,那么失败的话就会导致重新启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class SupervisingActor extends AbstractActor { public static void main (String[] args) { ActorSystem system = ActorSystem.create("testSystem" ); ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor" ); supervisingActor.tell("failChild" , ActorRef.noSender()); } static Props props () { return Props.create(SupervisingActor.class, SupervisingActor::new ); } ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor" ); @Override public Receive createReceive () { return receiveBuilder() .matchEquals("failChild" , f -> { child.tell("fail" , getSelf()); }) .build(); } } class SupervisedActor extends AbstractActor { static Props props () { return Props.create(SupervisedActor.class, SupervisedActor::new ); } @Override public void preStart () { System.out.println("supervised actor started" ); } @Override public void postStop () { System.out.println("supervised actor stopped" ); } @Override public Receive createReceive () { return receiveBuilder() .matchEquals("fail" , f -> { System.out.println("supervised actor fails now" ); throw new Exception ("I failed!" ); }) .build(); } }
1 2 3 4 5 6 7 8 9 10 11 [2022-01-07 11:55:17,219] [INFO] [akka.event.slf4j.Slf4jLogger] [testSystem-akka.actor.default-dispatcher-4] [] - Slf4jLogger started supervised actor started supervised actor fails now supervised actor stopped supervised actor started [2022-01-07 11:55:17,469] [ERROR] [akka.actor.OneForOneStrategy] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/supervising-actor/supervised-actor] - I failed! java.lang.Exception: I failed! 我们看到失败后,被监督的 Actor 停止并立即重新启动。我们还看到一个日志条目,报告处理的异常,是测试异常。我们使用了preStart()和postStop()钩子,这是重启后和重启前默认调用的钩子,因此我们无法区分 Actor 内部是第一次启动还是重启。这通常是正确的做法,重新启动的目的是将 Actor 设置为已知的良好状态,这通常意味着一个干净的开始阶段。实际上,在重新启动时,调用的是preRestart()和postRestart()方法,但如果不重写这两个方法,则默认分别委托给postStop()和preStart()。
Actor的创建 下面写一个简单的创建Actor的模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class IotSupervisor extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this ); public static Props props () { return Props.create(IotSupervisor.class, IotSupervisor::new ); } @Override public void preStart () { log.info("IoT Application started" ); } @Override public void postStop () { log.info("IoT Application stopped" ); } @Override public Receive createReceive () { return receiveBuilder() .build(); } } public class IotMain { public static void main (String[] args) throws IOException { ActorSystem system = ActorSystem.create("iot-system" ); try { ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor" ); System.out.println("Press ENTER to exit the system" ); System.in.read(); } finally { system.terminate(); } } }
发送消息与发送响应 处理每条数据主要是在重写createReceive方法中实现的
下面还有一个示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this ); final String groupId; final String deviceId; public Device (String groupId, String deviceId) { this .groupId = groupId; this .deviceId = deviceId; } public static Props props (String groupId, String deviceId) { return Props.create(Device.class, () -> new Device (groupId, deviceId)); } public static final class RecordTemperature { final long requestId; final double value; public RecordTemperature (long requestId, double value) { this .requestId = requestId; this .value = value; } } public static final class TemperatureRecorded { final long requestId; public TemperatureRecorded (long requestId) { this .requestId = requestId; } } public static final class ReadTemperature { final long requestId; public ReadTemperature (long requestId) { this .requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature (long requestId, Optional<Double> value) { this .requestId = requestId; this .value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart () { log.info("Device actor {}-{} started" , groupId, deviceId); } @Override public void postStop () { log.info("Device actor {}-{} stopped" , groupId, deviceId); } @Override public Receive createReceive () { return receiveBuilder() .match(RecordTemperature.class, r -> { log.info("Recorded temperature reading {} with {}" , r.value, r.requestId); lastTemperatureReading = Optional.of(r.value); getSender().tell(new TemperatureRecorded (r.requestId), getSelf()); }) .match(ReadTemperature.class, r -> { getSender().tell(new RespondTemperature (r.requestId, lastTemperatureReading), getSelf()); }) .match(RespondTemperature.class,r ->{ log.info("开始" ); getSelf().tell("test" ,getSelf()); }) .build(); } } public class Test1 { public static void main (String[] args) { ActorSystem system = ActorSystem.create("testSystem" ); ActorRef deviceActor = system.actorOf(Device.props("group" , "device" )); deviceActor.tell(new Device .RecordTemperature(1L , 24.0 ),deviceActor); deviceActor.tell(new Device .ReadTemperature(2L ), deviceActor); } }
这里代码里面deviceActor.tell(new Device.RecordTemperature(1L, 24.0), deviceActor);
这里的内容的第二个参数表示消息的接收者也就是消息的发送目标
这个代码里面通过 deviceActor.tell() 将消息发送到 deviceActor 自己,确保了消息处理顺序的一致性(例如,RecordTemperature 被处理完之后,才会处理 ReadTemperature)。这就是 Akka 的 消息传递顺序 和 并发性 之间的平衡
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
这里的getSender和getSelf
getSender是返回发送消息给当前的actor的actor,getSelf是返回当前的actor,这里的目的是让当前 actor(即 deviceActor)向发送 RecordTemperature 消息的 actor 发送响应(回应)
这里举一个示例的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ActorA extends AbstractActor { @Override public Receive createReceive () { return receiveBuilder() .match(String.class, msg -> { System.out.println("ActorA received message: " + msg); getContext().actorOf(Props.create(ActorB.class)).tell("Hello from ActorA" , getSelf()); }) .build(); } } public class ActorB extends AbstractActor { @Override public Receive createReceive () { return receiveBuilder() .match(String.class, msg -> { System.out.println("ActorB received message: " + msg); System.out.println("Sender: " + getSender()); System.out.println("Self: " + getSelf()); getSender().tell("Reply from ActorB" , getSelf()); }) .build(); }
日志输出如下:
1 2 3 4 ActorA received message: Hello from ActorA ActorB received message: Hello from ActorA Sender: Actor[akka://testSystem/user/$a#123456789] Self: Actor[akka://testSystem/user/$b#987654321]
Akka具有很好的有序性: 在 Akka 中 ,对于一对给定的 Actor,直接从第一个 Actor 发送到第二个 Actor 的消息不会被无序接收。
Actor A1 向 A2 发送消息M1、M2和M3。
Actor A3 向 A2 发送消息M4、M5和M6。
这意味着,对于 Akka 信息:
如果M1传递,则必须在M2和M3之前传递。
如果M2传递,则必须在M3之前传递。
如果M4传递,则必须在M5和M6之前传递。
如果M5传递,则必须在M6之前传递。
A2 可以看到 A1 的消息与 A3 的消息交织在一起。
由于没有保证的传递,任何消息都可能丢失,即不能到达 A2。
flink主要使用的就是Akka的Actor 和Future
小方法的使用 在 Actor 中,使用小方法也是一种很好的做法。建议将消息处理的实际工作委托给方法,而不是在每个lambda中定义具有大量代码的大型ReceiveBuilder。一个结构良好的 Actor 可以是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static class WellStructuredActor extends AbstractActor { public static class Msg1 {} public static class Msg2 {} public static class Msg3 {} @Override public Receive createReceive () { return receiveBuilder() .match(Msg1.class, this ::receiveMsg1) .match(Msg2.class, this ::receiveMsg2) .match(Msg3.class, this ::receiveMsg3) .build(); } private void receiveMsg1 (Msg1 msg) { } private void receiveMsg2 (Msg2 msg) { } private void receiveMsg3 (Msg3 msg) { } }
接收超时 ActorContext的setReceiveTimeout方法定义了非活动超时,在该超时之后,将触发发送ReceiveTimeout消息。指定超时时间后,接收函数应该能够处理akka.actor.ReceiveTimeout消息。1毫秒是支持的最小超时时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static class ReceiveTimeoutActor extends AbstractActor { public ReceiveTimeoutActor () { getContext().setReceiveTimeout(Duration.ofSeconds(10 )); } @Override public Receive createReceive () { return receiveBuilder() .matchEquals( "Hello" , s -> { getContext().setReceiveTimeout(Duration.ofSeconds(1 )); }) .match( ReceiveTimeout.class, r -> { getContext().cancelReceiveTimeout(); }) .build(); } }
这里的逻辑就是首先设置了接收超时时间是10s,如果10s没接收到消息就会触发ReceiveTimeout
然后如果接收到Hello消息,就会设置超时1s
如果接收到ReceiveTimeout的消息就会调用 getContext().cancelReceiveTimeout();
来取消超时
Actor的调度器 调度器是Akka的和核心,每个ActorSystem都有一个默认的调度器
我们可以在程序中,指定使用的调度器
1 2 ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher" ), "myactor3" );
这里的withDispatcher
就是使用了指定的Dispatcher调度器
调度器有三种类型,
Dispatcher:基于事件的调度器,可共享
PinnedDispatcher:这个调度器为每个使用它的 Actor 指定唯一的线程;即每个 Actor 将拥有自己的线程池,池中只有一个线程。
CallingThreadDispatcher:此调度器仅在当前调用的线程上运行。这个调度器不创建任何新的线程。可共享
这个调度器这里就说到这里吧,因为有点多,这块没仔细的看,需要的时候再去官网上找一下示例资料吧