flink源码解析(三)
2024-12-18 10:16:00 # Flink # 源码解析 # Jobmanager启动流程 # 1.flink启动脚本分析

Flink源码解析(三)

Flink集群启动脚本分析

rpc组件流程图

rpc主要就是一个远程调用的协议,就是客户端调用远程服务器上的程序内容

我们先看一下rpc组件的流程

首先flink的rpc内容主要是在flink-runtime模块下面的rpc包里面

这里的包我们还可以看到akka的组件内容,这里的涉及到的最重要的api就是下面的四个

  • RpcGateway网关

    这里有两个方法分别是getAddress,getHostname这两个方法就是获取了地址和主机名。

    最主要的就是所有的组件在通信的时候都需要实现这个网官里面获取到的ip和主机名

    RpcGateway网关主要提供了获取RpcEndpoint的地址的方法

  • RpcService

    这个里面都是一些启动服务,停止服务或者连接,都是通信交互的方法

  • RpcServer

    这个相当于是RPCService启动了一个RPCEndpoint之后返回的对象

  • RpcEndpoint

    这个是最重要的,这个可以直接理解为Akka框架中的actor就好,就是一个actor,用于传输用的

    RpcEndpoint下面有五个比较重要的子类

    TaskExecutor,JobMaster,Dispatcher,ResourceManager,YarnResourceManager

    image-20241211192407783

    下面是在网上找到的一个流程图,下面的内容我会根据这个流程图的内容进行详细说明

启动脚本分析

首先我们找到flink-dist这个里面都是我们的一些脚本

下面我把我们需要的脚本内容先都列在下面,然后在下面再写解析的内容

  • 找到 flink-dist
  • start-cluster.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
start-cluster.sh 集群启动脚本

#!/usr/bin/env bash

#1. 获取文件所在得路径
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

#2. 先加载配置文件
. "$bin"/config.sh

#3. 启动JobManager
# Start the JobManager instance(s)
shopt -s nocasematch

#4. 判断Jobmanager的启动模式,是集群还是还是单机模式
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
# HA Mode,启动HA模式 5.如果是基于zookeeper的集群模式,调用readMasters
readMasters

echo "Starting HA cluster with ${#MASTERS[@]} masters."

for ((i=0;i<${#MASTERS[@]};++i)); do
master=${MASTERS[i]}
webuiport=${WEBUIPORTS[i]}

#6. 本地模式
if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
else
#7. 远程通过SSH方式后台启动
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
fi
done

else
echo "Starting cluster."

#8. 当前节点启动JobManager
# Start single JobManager on this machine
"$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch

# 9.启动TaskManger实例
TMSlaves start
  • 加载 config.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
config.sh
#!/usr/bin/env bash

constructFlinkClassPath() {
local FLINK_DIST
local FLINK_CLASSPATH

while read -d '' -r jarfile ; do
if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then
FLINK_DIST="$FLINK_DIST":"$jarfile"
elif [[ "$FLINK_CLASSPATH" == "" ]]; then
FLINK_CLASSPATH="$jarfile";
else
FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
fi
done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)

if [[ "$FLINK_DIST" == "" ]]; then
# write error message to stderr since stdout is stored as the classpath
(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")

# exit function with empty classpath to force process failure
exit 1
fi

echo "$FLINK_CLASSPATH""$FLINK_DIST"
}

# These are used to mangle paths that are passed to java when using
# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
# "cygpath" can do the conversion.
manglePath() {
UNAME=$(uname -s)
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
echo `cygpath -w "$1"`
else
echo $1
fi
}

manglePathList() {
UNAME=$(uname -s)
# a path list, for example a java classpath
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
echo `cygpath -wp "$1"`
else
echo $1
fi
}

# Looks up a config value by key from a simple YAML-style key-value map.
# $1: key to look up
# $2: default value to return if key does not exist
# $3: config file to read from
readFromConfig() {
local key=$1
local defaultValue=$2
local configFile=$3

# first extract the value with the given key (1st sed), then trim the result (2nd sed)
# if a key exists multiple times, take the "last" one (tail)
local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`

[ -z "$value" ] && echo "$defaultValue" || echo "$value"
}

########################################################################################################################
# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
# -or- the respective environment variables are not set.
########################################################################################################################


# WARNING !!! , these values are only used if there is nothing else is specified in
# conf/flink-conf.yaml

DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary

########################################################################################################################
# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
########################################################################################################################

KEY_JOBM_MEM_SIZE="jobmanager.heap.size"
KEY_JOBM_MEM_MB="jobmanager.heap.mb"
KEY_TASKM_MEM_SIZE="taskmanager.heap.size"
KEY_TASKM_MEM_MB="taskmanager.heap.mb"
KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"

KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback

KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"

KEY_ENV_PID_DIR="env.pid.dir"
KEY_ENV_LOG_DIR="env.log.dir"
KEY_ENV_LOG_MAX="env.log.max"
KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
KEY_ENV_JAVA_HOME="env.java.home"
KEY_ENV_JAVA_OPTS="env.java.opts"
KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_HIGH_AVAILABILITY="high-availability"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"

########################################################################################################################
# MEMORY SIZE UNIT
########################################################################################################################

BYTES_UNITS=("b" "bytes")
KILO_BYTES_UNITS=("k" "kb" "kibibytes")
MEGA_BYTES_UNITS=("m" "mb" "mebibytes")
GIGA_BYTES_UNITS=("g" "gb" "gibibytes")
TERA_BYTES_UNITS=("t" "tb" "tebibytes")

hasUnit() {
text=$1

trimmed=$(echo -e "${text}" | tr -d '[:space:]')

if [ -z "$trimmed" -o "$trimmed" == " " ]; then
echo "$trimmed is an empty- or whitespace-only string"
exit 1
fi

len=${#trimmed}
pos=0

while [ $pos -lt $len ]; do
current=${trimmed:pos:1}
if [[ ! $current < '0' ]] && [[ ! $current > '9' ]]; then
let pos+=1
else
break
fi
done

number=${trimmed:0:pos}

unit=${trimmed:$pos}
unit=$(echo -e "${unit}" | tr -d '[:space:]')
unit=$(echo -e "${unit}" | tr '[A-Z]' '[a-z]')

[[ ! -z "$unit" ]]
}

parseBytes() {
text=$1

trimmed=$(echo -e "${text}" | tr -d '[:space:]')

if [ -z "$trimmed" -o "$trimmed" == " " ]; then
echo "$trimmed is an empty- or whitespace-only string"
exit 1
fi

len=${#trimmed}
pos=0

while [ $pos -lt $len ]; do
current=${trimmed:pos:1}
if [[ ! $current < '0' ]] && [[ ! $current > '9' ]]; then
let pos+=1
else
break
fi
done

number=${trimmed:0:pos}

unit=${trimmed:$pos}
unit=$(echo -e "${unit}" | tr -d '[:space:]')
unit=$(echo -e "${unit}" | tr '[A-Z]' '[a-z]')

if [ -z "$number" ]; then
echo "text does not start with a number"
exit 1
fi

local multiplier
if [ -z "$unit" ]; then
multiplier=1
else
if matchesAny $unit "${BYTES_UNITS[*]}"; then
multiplier=1
elif matchesAny $unit "${KILO_BYTES_UNITS[*]}"; then
multiplier=1024
elif matchesAny $unit "${MEGA_BYTES_UNITS[*]}"; then
multiplier=`expr 1024 \* 1024`
elif matchesAny $unit "${GIGA_BYTES_UNITS[*]}"; then
multiplier=`expr 1024 \* 1024 \* 1024`
elif matchesAny $unit "${TERA_BYTES_UNITS[*]}"; then
multiplier=`expr 1024 \* 1024 \* 1024 \* 1024`
else
echo "[ERROR] Memory size unit $unit does not match any of the recognized units"
exit 1
fi
fi

((result=$number * $multiplier))

if [ $[result / multiplier] != "$number" ]; then
echo "[ERROR] The value $text cannot be re represented as 64bit number of bytes (numeric overflow)."
exit 1
fi

echo "$result"
}

matchesAny() {
str=$1
variants=$2

for s in ${variants[*]}; do
if [ $str == $s ]; then
return 0
fi
done

return 1
}

getKibiBytes() {
bytes=$1
echo "$(($bytes >>10))"
}

getMebiBytes() {
bytes=$1
echo "$(($bytes >> 20))"
}

getGibiBytes() {
bytes=$1
echo "$(($bytes >> 30))"
}

getTebiBytes() {
bytes=$1
echo "$(($bytes >> 40))"
}

########################################################################################################################
# PATHS AND CONFIG
########################################################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
if [ "$iteration" -gt 100 ]; then
echo "Cannot resolve path: You have a cyclic symlink in $target."
break
fi
ls=`ls -ld -- "$target"`
target=`expr "$ls" : '.* -> \(.*\)$'`
iteration=$((iteration + 1))
done

# Convert relative path to absolute path and resolve directory symlinks
bin=`dirname "$target"`
SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`

# Define the main directory of the flink installation
# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
if [ -z "$_FLINK_HOME_DETERMINED" ]; then
FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
fi
FLINK_LIB_DIR=$FLINK_HOME/lib
FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
FLINK_OPT_DIR=$FLINK_HOME/opt


# These need to be mangled because they are directly passed to java.
# The above lib path is used by the shell script to retrieve jars in a
# directory, so it needs to be unmangled.
FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
FLINK_CONF_FILE="flink-conf.yaml"
YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}

### Exported environment variables ###
export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_PLUGINS_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR
# export /opt dir to access it for the SQL client
export FLINK_OPT_DIR

########################################################################################################################
# ENVIRONMENT VARIABLES
########################################################################################################################

# read JAVA_HOME from config with no default value
MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
# check if config specified JAVA_HOME
if [ -z "${MY_JAVA_HOME}" ]; then
# config did not specify JAVA_HOME. Use system JAVA_HOME
MY_JAVA_HOME=${JAVA_HOME}
fi
# check if we have a valid JAVA_HOME and if java is not available
if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."
exit 1
else
JAVA_HOME=${MY_JAVA_HOME}
fi

UNAME=$(uname -s)
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
JAVA_RUN=java
else
if [[ -d $JAVA_HOME ]]; then
JAVA_RUN=$JAVA_HOME/bin/java
else
JAVA_RUN=java
fi
fi

# Define HOSTNAME if it is not already set
if [ -z "${HOSTNAME}" ]; then
HOSTNAME=`hostname`
fi

IS_NUMBER="^[0-9]+$"

# Define FLINK_JM_HEAP if it is not already set
if [ -z "${FLINK_JM_HEAP}" ]; then
FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_JM_HEAP}" == 0 ]; then
FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
fi

# Define FLINK_TM_HEAP if it is not already set
if [ -z "${FLINK_TM_HEAP}" ]; then
FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_TM_HEAP}" == 0 ]; then
FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")

if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
else
FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
fi
fi

# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
fi

# Define FLINK_TM_OFFHEAP if it is not already set
if [ -z "${FLINK_TM_OFFHEAP}" ]; then
FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set
if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
fi


# Define FLINK_TM_NET_BUF_FRACTION if it is not already set
if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
fi

# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then
FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
fi
fi

# Define FLINK_TM_NET_BUF_MIN if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
# default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
fi

# Define FLINK_TM_NET_BUF_MAX if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
# default: 1GB = 1073741824 bytes
FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
fi


# Verify that NUMA tooling is available
command -v numactl >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
FLINK_TM_COMPUTE_NUMA="false"
else
# Define FLINK_TM_COMPUTE_NUMA if it is not already set
if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")
fi
fi

if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
fi

if [ -z "${FLINK_LOG_DIR}" ]; then
FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
fi

if [ -z "${YARN_CONF_DIR}" ]; then
YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
fi

if [ -z "${HADOOP_CONF_DIR}" ]; then
HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
fi

if [ -z "${FLINK_PID_DIR}" ]; then
FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
fi

if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")

# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then
FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_SSH_OPTS}" ]; then
FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
fi

# Define ZK_HEAP if it is not already set
if [ -z "${ZK_HEAP}" ]; then
ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
fi

# High availability
if [ -z "${HIGH_AVAILABILITY}" ]; then
HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
if [ -z "${HIGH_AVAILABILITY}" ]; then
# Try deprecated value
DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
if [ -z "${DEPRECATED_HA}" ]; then
HIGH_AVAILABILITY="none"
elif [ ${DEPRECATED_HA} == "standalone" ]; then
# Standalone is now 'none'
HIGH_AVAILABILITY="none"
else
HIGH_AVAILABILITY=${DEPRECATED_HA}
fi
fi
fi

# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
JVM_ARGS=""
fi

# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
if [ -z "$HADOOP_CONF_DIR" ]; then
if [ -n "$HADOOP_HOME" ]; then
# HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
if [ -d "$HADOOP_HOME/conf" ]; then
# its a Hadoop 1.x
HADOOP_CONF_DIR="$HADOOP_HOME/conf"
fi
if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
# Its Hadoop 2.2+
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
fi
fi
fi

# try and set HADOOP_CONF_DIR to some common default if it's not set
if [ -z "$HADOOP_CONF_DIR" ]; then
if [ -d "/etc/hadoop/conf" ]; then
echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set."
HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi

INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"

if [ -n "${HBASE_CONF_DIR}" ]; then
INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
fi

# Auxilliary function which extracts the name of host from a line which
# also potentially includes topology information and the taskManager type
extractHostName() {
# handle comments: extract first part of string (before first # character)
SLAVE=`echo $1 | cut -d'#' -f 1`

# Extract the hostname from the network hierarchy
if [[ "$SLAVE" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
SLAVE=${BASH_REMATCH[1]}
fi

echo $SLAVE
}

# Auxilliary functions for log file rotation
rotateLogFilesWithPrefix() {
dir=$1
prefix=$2
while read -r log ; do
rotateLogFile "$log"
# find distinct set of log file names, ignoring the rotation number (trailing dot and digit)
done < <(find "$dir" ! -type d -path "${prefix}*" | sed -E s/\.[0-9]+$// | sort | uniq)
}

rotateLogFile() {
log=$1;
num=$MAX_LOG_FILE_NUMBER
if [ -f "$log" -a "$num" -gt 0 ]; then
while [ $num -gt 1 ]; do
prev=`expr $num - 1`
[ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
num=$prev
done
mv "$log" "$log.$num";
fi
}

readMasters() {
MASTERS_FILE="${FLINK_CONF_DIR}/masters"

if [[ ! -f "${MASTERS_FILE}" ]]; then
echo "No masters file. Please specify masters in 'conf/masters'."
exit 1
fi

MASTERS=()
WEBUIPORTS=()

MASTERS_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOSTWEBUIPORT=$( extractHostName $line)

if [ -n "$HOSTWEBUIPORT" ]; then
HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
MASTERS+=(${HOST})

if [ -z "$WEBUIPORT" ]; then
WEBUIPORTS+=(0)
else
WEBUIPORTS+=(${WEBUIPORT})
fi

if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
MASTERS_ALL_LOCALHOST=false
fi
fi
done < "$MASTERS_FILE"
}

# 12.循环遍历slaves文件里面的节点
readSlaves() {
SLAVES_FILE="${FLINK_CONF_DIR}/slaves"

if [[ ! -f "$SLAVES_FILE" ]]; then
echo "No slaves file. Please specify slaves in 'conf/slaves'."
exit 1
fi

SLAVES=()

SLAVES_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOST=$( extractHostName $line)
if [ -n "$HOST" ] ; then
#获取所有slaves信息
SLAVES+=(${HOST})
#是否是本地模式判断
if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
SLAVES_ALL_LOCALHOST=false
fi
fi
done < "$SLAVES_FILE"
}

# starts or stops TMs on all slaves
# TMSlaves start|stop
#10.启动所有的TaskManager
TMSlaves() {
CMD=$1

#11.读取slaves配置文件内容
readSlaves

if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then
# all-local setup
for slave in ${SLAVES[@]}; do
#13.本机直接启动
"${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
done
else
# non-local setup
# Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
command -v pdsh >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
for slave in ${SLAVES[@]}; do
#14. 远程启动
ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
done
else
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
fi
fi
}

useOffHeapMemory() {
[[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
}

HAVE_AWK=
# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config)
calculateNetworkBufferMemory() {
local network_buffers_bytes
if [ "${FLINK_TM_HEAP_MB}" -le "0" ]; then
echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
exit 1
fi

if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
# fix memory size for network buffers
network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
else
if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
echo "Min must be less than or equal to max."
echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
exit 1
fi

# Bash only performs integer arithmetic so floating point computation is performed using awk
if [[ -z "${HAVE_AWK}" ]] ; then
command -v awk >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
echo "[ERROR] Program 'awk' not found."
echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
exit 1
fi
HAVE_AWK=true
fi

# We calculate the memory using a fraction of the total memory
if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
echo "It must be between 0.0 and 1.0."
echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
exit 1
fi

network_buffers_bytes=`awk "BEGIN { x = ${FLINK_TM_HEAP_MB} * 1048576 * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
fi

# recalculate the JVM heap memory by taking the network buffers into account
local tm_heap_size_bytes=$((${FLINK_TM_HEAP_MB} << 20)) # megabytes to bytes
if [[ "${tm_heap_size_bytes}" -le "${network_buffers_bytes}" ]]; then
echo "[ERROR] Configured TaskManager memory size (${FLINK_TM_HEAP_MB} MB, from '${KEY_TASKM_MEM_SIZE}') must be larger than the network buffer memory size (${network_buffers_bytes} bytes, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')."
exit 1
fi

echo ${network_buffers_bytes}
}

# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)
calculateTaskManagerHeapSizeMB() {
if [ "${FLINK_TM_HEAP_MB}" -le "0" ]; then
echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
exit 1
fi

local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
# network buffers are always off-heap and thus need to be deduced from the heap memory size
local tm_heap_size_mb=$((${FLINK_TM_HEAP_MB} - network_buffers_mb))

if useOffHeapMemory; then

if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
# We split up the total memory in heap and off-heap memory
if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP_MB} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')."
exit 1
fi

tm_heap_size_mb=$((tm_heap_size_mb - FLINK_TM_MEM_MANAGED_SIZE))
else
# Bash only performs integer arithmetic so floating point computation is performed using awk
if [[ -z "${HAVE_AWK}" ]] ; then
command -v awk >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
echo "[ERROR] Program 'awk' not found."
echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
exit 1
fi
HAVE_AWK=true
fi

# We calculate the memory using a fraction of the total memory
if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
echo "It must be between 0.0 and 1.0."
echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
exit 1
fi

# recalculate the JVM heap memory by taking the off-heap ratio into account
local offheap_managed_memory_size=`awk "BEGIN { printf \"%.0f\n\", ${tm_heap_size_mb} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
tm_heap_size_mb=$((tm_heap_size_mb - offheap_managed_memory_size))
fi
fi

echo ${tm_heap_size_mb}
}

  • 最终启动脚本的命令在:

    • jobmanager.sh
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    # Start/stop a Flink JobManager.
    USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"

    STARTSTOP=$1
    HOST=$2 # optional when starting multiple instances
    WEBUIPORT=$3 # optional when starting multiple instances

    #16.参数校验
    if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
    echo $USAGE
    exit 1
    fi

    #17. 加载config.sh
    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`

    . "$bin"/config.sh

    #18.JobManager启动的主类
    ENTRYPOINT=standalonesession

    if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
    echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
    else
    flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
    FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
    fi

    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
    echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
    exit 1
    fi

    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
    export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
    fi

    # Add JobManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"

    # Startup parameters
    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
    if [ ! -z $HOST ]; then
    args+=("--host")
    args+=("${HOST}")
    fi

    if [ ! -z $WEBUIPORT ]; then
    args+=("--webui-port")
    args+=("${WEBUIPORT}")
    fi
    fi

    if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
    else
    #19.通过 flink-daemon.sh 脚本启动 standalonesession
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
    fi
    • taskmanager.sh
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80

    # Start/stop a Flink TaskManager.
    USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

    STARTSTOP=$1

    ARGS=("${@:2}")

    if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
    echo $USAGE
    exit 1
    fi

    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`

    . "$bin"/config.sh

    #20.定义变量 taskexecutor
    ENTRYPOINT=taskexecutor

    if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    # if memory allocation mode is lazy and no other JVM options are set,
    # set the 'Concurrent Mark Sweep GC'
    if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
    export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
    echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
    else
    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
    fi

    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
    echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
    exit 1
    fi

    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then

    TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
    # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
    TM_MAX_OFFHEAP_SIZE="8388607T"

    export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"

    fi

    # Add TaskManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

    # Startup parameters
    ARGS+=("--configDir" "${FLINK_CONF_DIR}")
    fi

    if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
    else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
    # Start a single TaskManager
    #flink-daemon.sh taskexecutor
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else
    # Example output from `numactl --show` on an AWS c4.8xlarge:
    # policy: default
    # preferred node: current
    # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
    # cpubind: 0 1
    # nodebind: 0 1
    # membind: 0 1
    read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
    for NODE_ID in "${NODE_LIST[@]:1}"; do
    # Start a TaskManager for each NUMA node
    numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    done
    fi
    fi
    • flink-daemon.sh
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    #!/usr/bin/env bash

    # Start/stop a Flink daemon.
    USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"

    STARTSTOP=$1
    DAEMON=$2
    ARGS=("${@:3}") # get remaining arguments as array

    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`

    . "$bin"/config.sh

    case $DAEMON in
    (taskexecutor)
    #21.TaskExecutor启动的类 TaskManagerRunner
    CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (zookeeper)
    CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (historyserver)
    CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (standalonesession)
    #22.JobManager启动的类 StandaloneSessionClusterEntrypoint
    CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
    CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
    ;;

    (*)
    echo "Unknown daemon '${DAEMON}'. $USAGE."
    exit 1
    ;;
    esac

    if [ "$FLINK_IDENT_STRING" = "" ]; then
    FLINK_IDENT_STRING="$USER"
    fi

    FLINK_TM_CLASSPATH=`constructFlinkClassPath`

    pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid

    mkdir -p "$FLINK_PID_DIR"

    # Log files for daemons are indexed from the process ID's position in the PID
    # file. The following lock prevents a race condition during daemon startup
    # when multiple daemons read, index, and write to the PID file concurrently.
    # The lock is created on the PID directory since a lock file cannot be safely
    # removed. The daemon is started with the lock closed and the lock remains
    # active in this script until the script exits.
    command -v flock >/dev/null 2>&1
    if [[ $? -eq 0 ]]; then
    exec 200<"$FLINK_PID_DIR"
    flock 200
    fi

    # Ascending ID depending on number of lines in pid file.
    # This allows us to start multiple daemon of each type.
    id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")

    FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"
    log="${FLINK_LOG_PREFIX}.log"
    out="${FLINK_LOG_PREFIX}.out"

    log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")

    JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

    # Only set JVM 8 arguments if we have correctly extracted the version
    if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
    if [ "$JAVA_VERSION" -lt 18 ]; then
    JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
    fi
    fi

    case $STARTSTOP in

    (start)
    # Rotate log files
    rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"

    # Print a warning if daemons are already running on host
    if [ -f "$pid" ]; then
    active=()
    while IFS='' read -r p || [[ -n "$p" ]]; do
    kill -0 $p >/dev/null 2>&1
    if [ $? -eq 0 ]; then
    active+=($p)
    fi
    done < "${pid}"

    count="${#active[@]}"

    if [ ${count} -gt 0 ]; then
    echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
    fi
    fi

    # Evaluate user options for local variable expansion
    FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

    echo "Starting $DAEMON daemon on host $HOSTNAME."
    $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &

    mypid=$!

    # Add to pid file if successful start
    if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
    echo $mypid >> "$pid"
    else
    echo "Error starting $DAEMON daemon."
    exit 1
    fi
    ;;

    (stop)
    if [ -f "$pid" ]; then
    # Remove last in pid file
    to_stop=$(tail -n 1 "$pid")

    if [ -z $to_stop ]; then
    rm "$pid" # If all stopped, clean up pid file
    echo "No $DAEMON daemon to stop on host $HOSTNAME."
    else
    sed \$d "$pid" > "$pid.tmp" # all but last line

    # If all stopped, clean up pid file
    [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"

    if kill -0 $to_stop > /dev/null 2>&1; then
    echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
    kill $to_stop
    else
    echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."
    fi
    fi
    else
    echo "No $DAEMON daemon to stop on host $HOSTNAME."
    fi
    ;;

    (stop-all)
    if [ -f "$pid" ]; then
    mv "$pid" "${pid}.tmp"

    while read to_stop; do
    if kill -0 $to_stop > /dev/null 2>&1; then
    echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
    kill $to_stop
    else
    echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."
    fi
    done < "${pid}.tmp"
    rm "${pid}.tmp"
    fi
    ;;

    (*)
    echo "Unexpected argument '$STARTSTOP'. $USAGE."
    exit 1
    ;;
    esac

脚本解析

首先在start-cluster.sh脚本中,先获取文件所在路径

然后加载配置文件,这个加载配置文件是在config.sh文件中,

然后我们会去判断这个jobManager的启动模式,看看是单机启动还是集群启动

如果是集群启动的话就是zookeeper的模式

如果是集群模式的话就调用readMasters

这个readMasters是在config.sh的文件中调用的

这个readMasters其实就是把文件中的一些内容取出来,比如取出来一些端口,主要是读取配置,读取端口和主机名,判断是不是在本地的,如果有一个主机不是本地的就把MASTERS_ALL_LOCALHOST这个标识改为false

1
2
3
for ((i=0;i<${#MASTERS[@]};++i)); do
master=${MASTERS[i]}
webuiport=${WEBUIPORTS[i]}

然后这里就是将前面得到的端口等内容取出来

1
2
if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"

如果是本地模式的话就直接调用jobmanager.sh start的脚本

1
2
#7. 远程通过SSH方式后台启动
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"

如果不是本地模式的话就使用SSH的方式远程后台启动

1
2
3
4
5
6
7
else
echo "Starting cluster."

#8. 当前节点启动JobManager
# Start single JobManager on this machine
"$FLINK_BIN_DIR"/jobmanager.sh start
fi

如果我们不是之前的那个zookeeper的集群启动模式,那么我们就是本机启动的模式了,那么我们就直接在本机运行jobmanager.sh start就好了

1
2
# 9.启动TaskManger实例
TMSlaves start

等启动完jobmanager.sh 后再启动TMSlaves,也就是taskmanager

这个TMSlaves的内容还是在config文件中,我们再看一下这里的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 12.循环遍历slaves文件里面的节点
readSlaves() {
SLAVES_FILE="${FLINK_CONF_DIR}/slaves"

if [[ ! -f "$SLAVES_FILE" ]]; then
echo "No slaves file. Please specify slaves in 'conf/slaves'."
exit 1
fi

SLAVES=()

SLAVES_ALL_LOCALHOST=true
GOON=true
while $GOON; do
read line || GOON=false
HOST=$( extractHostName $line)
if [ -n "$HOST" ] ; then
#获取所有slaves信息
SLAVES+=(${HOST})
#是否是本地模式判断
if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
SLAVES_ALL_LOCALHOST=false
fi
fi
done < "$SLAVES_FILE"
}

# starts or stops TMs on all slaves
# TMSlaves start|stop

#10.启动所有的TaskManager
TMSlaves() {
CMD=$1

#11.读取slaves配置文件内容
readSlaves

if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then
# all-local setup
for slave in ${SLAVES[@]}; do
#13.本机直接启动
"${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
done
else
# non-local setup
# Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
command -v pdsh >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
for slave in ${SLAVES[@]}; do
#14. 远程启动
ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
done
else
PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \
"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
fi
fi
}

CMD就是获取前面的那个start

readSlaves方法里面试找到文件,然后默认是本地模式,然后把主机名拿出来,判断每一个地址是不是本地,然后更改模式标记

然后就是判断标记等于true是本地的话就直接本地启动,不是本地就远程启动

启动的就是taskManager.sh start

上面就是我们flink脚本的解析,下面我们再来说一下具体的启动jobmanager和启动taskmanager的脚本内容

Jobmanager.sh

1
2
3
4
5
6
7
8
9
10
11
#16.参数校验
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
fi
#17. 加载config.sh
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

首先还是参数校验

然后加载了一下config.sh文件

1
ENTRYPOINT=standalonesession

这是很重要的一步

具体后面会说到

1
2
3
4
5
6
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
#19.通过 flink-daemon.sh 脚本启动 standalonesession
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi

然后就是这里的通过flink-daemon启动stanalonesession

Taskmanager.sh

这里的前面和Jobmanager.sh一样都是先获取config.sh中的配置,然后再进行重要的一步

1
2
#20.定义变量 taskexecutor
ENTRYPOINT=taskexecutor

这里是很重要的一步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
#flink-daemon.sh taskexecutor
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
done

然后在这里还是使用的flink-daemon.sh来启动的这个taskexecutor

下面让我们来详细的看一下这个flink-daemon.sh脚本中有什么

1
2
3
4
5
6
7
8
9
10
(taskexecutor)
#21.TaskExecutor启动的类 TaskManagerRunner
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;

(standalonesession)
#22.JobManager启动的类 StandaloneSessionClusterEntrypoint
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;

这里的TaskManagerRunner这个代码就是启动taskmanager的
这里的StandaloneSessionClusterEntrypoint就是启动jobManager的

我们可以看到这里是调用了代码中的启动类,接下来我们看一下具体代码中的启动类是怎么写的

Jobmanager启动流程解析
Jobmanager组成介绍

我们首先需要知道的是

  • 一个flink集群是由多个一个flink master和多个taskmanager组成的

  • 一个flinkmanager里面有一个resourcemanager和多个jobmanager,一个jobmanager对应一个job任务,jobmanager里面包括scheduler是负责调度job的DAG中的task的发送资源请求,Slot pool有这个job的资源,flinkmaster中的resourcemanager是负责资源调度的

  • taskmanager启动后会不停的对resourcemanager进行心跳通信

  • dispatcher

    • dispatcher是提供了rest接口接收提交job,负责启动jobmanager和提交job,开启web ui
    • 其实属于是客户端和jobmanager的中间件
  • jobmanager

    • 负责具体的job执行
    • 负责接收到flink作业,调度task,管理taskmanager的
    • 接收到任务了才会启动
  • WebMonitorEndpoint

    • 这个是客户端提交了一个请求后,他会找到合适的Handler来处理这个请求
    • 比如用户使用浏览器访问 http://localhost:8081/jobs要查看任务状态,WebMonitorEndpoint 会检查请求的 URL 路径。例如,URL 路径 /jobs 可能是请求任务列表的接口。根据请求的 URL 路径,WebMonitorEndpoint 会选择合适的 Handler 来处理该请求。不同的 Handler 对应不同的功能。例如/jobs:显示所有作业的概览信息,通常由 JobsOverviewHandler 处理。/job/{jobid}:查看特定作业的详细信息,通常由 JobDetailHandler 处理。就交给合适的Handler去处理
  • WebMonitorEndpoint 负责与外部客户端(Web UI、CLI)进行交互,接收并分发 HTTP 请求。

  • Dispatcher 负责 Flink 作业的调度和生命周期管理,包括作业的执行、暂停、恢复等。

​ 两者的主要区别在于:WebMonitorEndpoint 更侧重于处理外部请求并与客户端交互,而 Dispatcher 则专注于作业的调度与执行。

StandaloneSessionClusterEntrypoint解析

我们回到这个代码来看一下啊

前面的一些都是输出一些集群启动信息

1
SignalHandler.register(LOG);

这个里面是一些操作系统相关的内容,在捕获系统信号的时候关闭操作什么的,这里就不看了,看不懂也…

1
2
3
4
5
/**
* 注册钩子,当集群出现宕机,或者停止等情况,会执行的关闭服务操作
* 类似于:Actor的父子钩子
* */
JvmShutdownSafeguard.installAsShutdownHook(LOG);

这里是注册了一些钩子的东西,关闭集群的时候会一个一个的关闭

1
2
3
4
5
6
/**
* 步骤二:
* 解析集群启动参数
* flink run -c k-v
* */
entrypointClusterConfiguration = commandLineParser.parse(args);

这里是要解析参数,比如我们启动jobmanager的时候后面会加什么 -c 或者-r什么的参数,这里就是解析这个参数,我们点进去看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
	public T parse(@Nonnull String[] args) throws FlinkParseException {
final DefaultParser parser = new DefaultParser();
final Options options = parserResultFactory.getOptions();

final CommandLine commandLine;
try {
commandLine = parser.parse(options, args, true);
} catch (ParseException e) {
throw new FlinkParseException("Failed to parse the command line arguments.", e);
}

return parserResultFactory.createResult(commandLine);
}
//这里最后return的这个createResult再点进去看看,发现是个接口,重写方法点进EntrypointClusterConfigurationParserFactory这个工厂里面
@Override
public EntrypointClusterConfiguration createResult(@Nonnull CommandLine commandLine) {
final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortStr = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortStr);
final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());

return new EntrypointClusterConfiguration(
configDir,
dynamicProperties,
commandLine.getArgs(),
hostname,
restPort);
}
//然后我们再点进这些配置里面看看
public static final Option CONFIG_DIR_OPTION = Option.builder("c")
.longOpt("configDir")
.required(true)
.hasArg(true)
.argName("configuration directory")
.desc("Directory which contains the configuration file flink-conf.yml.")
.build();

public static final Option REST_PORT_OPTION = Option.builder("r")
.longOpt("webui-port")
.required(false)
.hasArg(true)
.argName("rest port")
.desc("Port for the rest endpoint and the web UI.")
.build();

public static final Option DYNAMIC_PROPERTY_OPTION = Option.builder("D")
.argName("property=value")
.numberOfArgs(2)
.valueSeparator('=')
.desc("use value for given property")
.build();
//这里就可以看到这些就是对这个后面传来的这些配置的解析


我们再回到StandaloneSessionClusterEntrypoint中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
		/**
* 步骤三:
* 解析集群核心配置文件
* 查看 GlobalConfiguration.loadConfiguration方法
* File yamlConfigFile = new File(confDirFile, "flink-conf.yaml");
* flink-conf.yaml
* */
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
-----------------------------------------------------------------------------------------------------
protected static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) {
final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(entrypointClusterConfiguration.getDynamicProperties());
final Configuration configuration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir(), dynamicProperties);

// 设置端口
final int restPort = entrypointClusterConfiguration.getRestPort();

if(restPort >= 0) {
configuration.setInteger(RestOptions.PORT, restPort);
}

// 设置主机名
final String hostname = entrypointClusterConfiguration.getHostname();

if(hostname != null) {
configuration.setString(JobManagerOptions.ADDRESS, hostname);
}

return configuration;
}

---------------------------------------------------------------------------------------------------------
public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {

if (configDir == null) {
throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
}

final File confDirFile = new File(configDir);
if (!(confDirFile.exists())) {
throw new IllegalConfigurationException(
"The given configuration directory name '" + configDir +
"' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory.");
}
// final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);这里的这个是对解析配置文件
---------------------------------------------------------------------------------------------------
public final class GlobalConfiguration {

private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);

public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";

这里的内容是解析flink-conf.xml文件的内容放到config里面设置端口

1
2
3
4
5
6
7
/**
* 步骤四:
* 创建 StandaloneSessionClusterEntrypoint 加载配置
* 并且启动 StandaloneSessionClusterEntrypoint
*/
StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
ClusterEntrypoint.runClusterEntrypoint(entrypoint);

这里是加载配置并且启动

这里我们点进去看一下这个方法

1
2
3
4
5
/**
* 步骤五:
* 启动StandaloneSessionClusterEntrypoint
*/
clusterEntrypoint.startCluster();

这里启动StandaloneSessionClusterEntrypoint

然后我们点进去再看一下

1
2
3
4
5
6
/**
* 步骤六:
* 启动插件管理器,能够支持第三方的插件,单独加载
* /root/install/flink-1.19.2/plugins
* */
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);

这里支持第三方插件,因为我们启动flink的时候会发现有一个plugins这里就可以使用我们的第三方插件,这里是把第三方插件和flink本身的内容分隔开

1
2
3
4
5
6
7
8
9
/**
* 步骤七:
* 根据配置信息初始化FileSystem实例
* FileSystem.initialize(configuration, pluginManager);
* LocalFileSystem 本地文件系统
* HadoopFileSystem 为Flink封装的hadoopFileSystem
* 检查点的目标...
* */
configureFileSystems(configuration, pluginManager);

这里我们点进去看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void configureFileSystems(Configuration configuration, PluginManager pluginManager) {
LOG.info("Install default filesystem.");
// 初始化文件系统
FileSystem.initialize(configuration, pluginManager);
}
--------------------------------
public static void initialize(
Configuration config,
PluginManager pluginManager) throws IllegalConfigurationException {
LOCK.lock();
try {
// make sure file systems are re-instantiated after re-configuration
CACHE.clear();
FS_FACTORIES.clear();
Collection<Supplier<Iterator<FileSystemFactory>>> factorySuppliers = new ArrayList<>(2);
factorySuppliers.add(() -> ServiceLoader.load(FileSystemFactory.class).iterator());
--------------------------------------------------------
//这里点一下这个FileSystemFactory看一下这个里面是哪些文件系统

这里我们看到了一些熟悉的文件系统,这里的configureFileSystems(configuration, pluginManager);实际上就是初始化我们的文件系统,因为我们需要先把文件系统初始化出来,后续我们的checkpoint操作需要保存就先把实例初始化

再往下看

1
2
3
4
/**
* 加载关于安全方面的配置
* */
SecurityContext securityContext = installSecurityContext(configuration);

这里是一些安全方面的内容,就不细看了

再下面就是

1
2
3
4
/**
* 步骤七: 集群启动入口
* */
runCluster(configuration, pluginManager);

这里就是正式的集群的启动入口了

这里的详细内容先不多说,等后面再看吧,实在是有点多

我们点进去稍微看一下

1
2
3
4
5
initializeServices(configuration, pluginManager);
//jobmanager地址写入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

这里是初始化服务,把jobmanager的主机名和端口都写在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);

------------------------------------------------------
@Override
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
/**
* 注释:
* 1 参数是:StandaloneResourceManagerFactory 实例
* 2 返回值:DefaultDispatcherResourceManagerComponentFactory 实例
* 进入createSessionComponentFactory
*/
return DefaultDispatcherResourceManagerComponentFactory
.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
}
-------------------------------------------------------
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(ResourceManagerFactory<?> resourceManagerFactory) {

/**
* 实例化工厂
* 1、resourceManagerFactory = StandaloneResourceManagerFactory
* 2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory
* 3、restEndpointFactory = SessionRestEndpointFactory
*
* 进入 new DefaultDispatcherResourceManagerComponentFactory()
*/
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFactory.INSTANCE), resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
}
---------------------------------------------------------

这里我们是实例化了三个工厂类分别是

  • SessionDispatcherFactory

  • resourceManagerFactory

  • SessionRestEndpointFactory

在实例化了这三个工厂类之后,我们继续回到ClusterEntrypoint看下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* dispatcherResourceManagerComponentFactory -> 内部封装了三个核心工厂
* 步骤九:根据工厂对象创建核心组件并且启动
1、Dispatcher
2、ResourceManager
3、WebMonitorEndpoint
* 进入create方法
*/
clusterComponent = dispatcherResourceManagerComponentFactory
.create(configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);