constructFlinkClassPath() { local FLINK_DIST local FLINK_CLASSPATH
while read -d '' -r jarfile ; do if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then FLINK_DIST="$FLINK_DIST":"$jarfile" elif [[ "$FLINK_CLASSPATH" == "" ]]; then FLINK_CLASSPATH="$jarfile"; else FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" fi done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
if [[ "$FLINK_DIST" == "" ]]; then # write error message to stderr since stdout is stored as the classpath (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
# exit function with empty classpath to force process failure exit 1 fi
echo "$FLINK_CLASSPATH""$FLINK_DIST" } # These are used to mangle paths that are passed to java when using # cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere # but the windows java version expects them in Windows Format, i.e. C:\bla\blub. # "cygpath" can do the conversion. manglePath() { UNAME=$(uname -s) if [ "${UNAME:0:6}" == "CYGWIN" ]; then echo `cygpath -w "$1"` else echo $1 fi }
manglePathList() { UNAME=$(uname -s) # a path list, for example a java classpath if [ "${UNAME:0:6}" == "CYGWIN" ]; then echo `cygpath -wp "$1"` else echo $1 fi } # Looks up a config value by key from a simple YAML-style key-value map. # $1: key to look up # $2: default value to returnif key does not exist # $3: config file to read from readFromConfig() { local key=$1 local defaultValue=$2 local configFile=$3
# first extract the value with the given key (1st sed), then trim the result (2nd sed) # if a key exists multiple times, take the "last" one (tail) local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
[ -z "$value" ] && echo "$defaultValue" || echo "$value" } ######################################################################################################################## # DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml # -or- the respective environment variables are not set. ######################################################################################################################## # WARNING !!! , these values are only used if there is nothing else is specified in # conf/flink-conf.yaml
DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer) DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary ######################################################################################################################## # CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml ########################################################################################################################
if [ -z "$number" ]; then echo "text does not start with a number" exit 1 fi
local multiplier if [ -z "$unit" ]; then multiplier=1 else if matchesAny $unit "${BYTES_UNITS[*]}"; then multiplier=1 elif matchesAny $unit "${KILO_BYTES_UNITS[*]}"; then multiplier=1024 elif matchesAny $unit "${MEGA_BYTES_UNITS[*]}"; then multiplier=`expr 1024 \* 1024` elif matchesAny $unit "${GIGA_BYTES_UNITS[*]}"; then multiplier=`expr 1024 \* 1024 \* 1024` elif matchesAny $unit "${TERA_BYTES_UNITS[*]}"; then multiplier=`expr 1024 \* 1024 \* 1024 \* 1024` else echo "[ERROR] Memory size unit $unit does not match any of the recognized units" exit 1 fi fi
((result=$number * $multiplier))
if [ $[result / multiplier] != "$number" ]; then echo "[ERROR] The value $text cannot be re represented as 64bit number of bytes (numeric overflow)." exit 1 fi
echo "$result" }
matchesAny() { str=$1 variants=$2
for s in ${variants[*]}; do if [ $str == $s ]; then return 0 fi done
target="$0" # For the case, the executable has been directly symlinked, figure out # the correct bin path by following its symlink up to an upper bound. # Note: we can't use the readlink utility here if we want to be POSIX # compatible. iteration=0 while [ -L "$target" ]; do if [ "$iteration" -gt 100 ]; then echo "Cannot resolve path: You have a cyclic symlink in $target." break fi ls=`ls -ld -- "$target"` target=`expr "$ls" : '.* -> \(.*\)$'` iteration=$((iteration + 1)) done # Convert relative path to absolute path and resolve directory symlinks bin=`dirname "$target"` SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` # Define the main directory of the flink installation # If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here. if [ -z "$_FLINK_HOME_DETERMINED" ]; then FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` fi FLINK_LIB_DIR=$FLINK_HOME/lib FLINK_PLUGINS_DIR=$FLINK_HOME/plugins FLINK_OPT_DIR=$FLINK_HOME/opt # These need to be mangled because they are directly passed to java. # The above lib path is used by the shell script to retrieve jars in a # directory, so it needs to be unmangled. FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"` if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log FLINK_CONF_FILE="flink-conf.yaml" YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} ### Exported environment variables ### export FLINK_CONF_DIR export FLINK_BIN_DIR export FLINK_PLUGINS_DIR # export /lib dir to access it during deployment of the Yarn staging files export FLINK_LIB_DIR # export /opt dir to access it for the SQL client export FLINK_OPT_DIR ######################################################################################################################## # ENVIRONMENT VARIABLES ######################################################################################################################## # read JAVA_HOME from config with no default value MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}") # check if config specified JAVA_HOME if [ -z "${MY_JAVA_HOME}" ]; then # config did not specify JAVA_HOME. Use system JAVA_HOME MY_JAVA_HOME=${JAVA_HOME} fi # check if we have a valid JAVA_HOME and if java is not available if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME." exit 1 else JAVA_HOME=${MY_JAVA_HOME} fi
UNAME=$(uname -s) if [ "${UNAME:0:6}" == "CYGWIN" ]; then JAVA_RUN=java else if [[ -d $JAVA_HOME ]]; then JAVA_RUN=$JAVA_HOME/bin/java else JAVA_RUN=java fi fi # Define HOSTNAME if it is not already set if [ -z "${HOSTNAME}" ]; then HOSTNAME=`hostname` fi
IS_NUMBER="^[0-9]+$" # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Try read old config key, if new key not exists if [ "${FLINK_JM_HEAP}" == 0 ]; then FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}") fi # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") fi # Try read old config key, if new key not exists if [ "${FLINK_TM_HEAP}" == 0 ]; then FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}") fi # Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE})) else FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m")) fi fi # Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}") fi # Define FLINK_TM_OFFHEAP if it is not already set if [ -z "${FLINK_TM_OFFHEAP}" ]; then FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}") fi # Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}") fi # Define FLINK_TM_NET_BUF_FRACTION if it is not already set if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}") fi # Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback) if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}") if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN}) FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN} fi fi # Define FLINK_TM_NET_BUF_MIN if it is not already set if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each) FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}") FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN}) fi # Define FLINK_TM_NET_BUF_MAX if it is not already set if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then # default: 1GB = 1073741824 bytes FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}") FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX}) fi # Verify that NUMA tooling is available command -v numactl >/dev/null 2>&1 if [[ $? -ne 0 ]]; then FLINK_TM_COMPUTE_NUMA="false" else # Define FLINK_TM_COMPUTE_NUMA if it is not already set if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}") fi fi
if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}") fi
if [ -z "${FLINK_LOG_DIR}" ]; then FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}") fi
if [ -z "${YARN_CONF_DIR}" ]; then YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}") fi
if [ -z "${HADOOP_CONF_DIR}" ]; then HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}") fi
if [ -z "${FLINK_PID_DIR}" ]; then FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}") fi
if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
# Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )" fi
if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )" fi
if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )" fi
if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )" fi
if [ -z "${FLINK_SSH_OPTS}" ]; then FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}") fi # Define ZK_HEAP if it is not already set if [ -z "${ZK_HEAP}" ]; then ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}") fi # High availability if [ -z "${HIGH_AVAILABILITY}" ]; then HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}") if [ -z "${HIGH_AVAILABILITY}" ]; then # Try deprecated value DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}") if [ -z "${DEPRECATED_HA}" ]; then HIGH_AVAILABILITY="none" elif [ ${DEPRECATED_HA} == "standalone" ]; then # Standalone is now 'none' HIGH_AVAILABILITY="none" else HIGH_AVAILABILITY=${DEPRECATED_HA} fi fi fi # Arguments for the JVM. Used for job and task manager JVMs. # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that! if [ -z "${JVM_ARGS}" ]; then JVM_ARGS="" fi # Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty. if [ -z "$HADOOP_CONF_DIR" ]; then if [ -n "$HADOOP_HOME" ]; then # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path if [ -d "$HADOOP_HOME/conf" ]; then # its a Hadoop 1.x HADOOP_CONF_DIR="$HADOOP_HOME/conf" fi if [ -d "$HADOOP_HOME/etc/hadoop" ]; then # Its Hadoop 2.2+ HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop" fi fi fi # try and set HADOOP_CONF_DIR to some common default if it's not set if [ -z "$HADOOP_CONF_DIR" ]; then if [ -d "/etc/hadoop/conf" ]; then echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set." HADOOP_CONF_DIR="/etc/hadoop/conf" fi fi
if [ -n "${HBASE_CONF_DIR}" ]; then INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}" fi # Auxilliary function which extracts the name of host from a line which # also potentially includes topology information and the taskManager type extractHostName() { # handle comments: extract first part of string (before first # character) SLAVE=`echo $1 | cut -d'#' -f 1`
# Extract the hostname from the network hierarchy if [[ "$SLAVE" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then SLAVE=${BASH_REMATCH[1]} fi
echo $SLAVE } # Auxilliary functions for log file rotation rotateLogFilesWithPrefix() { dir=$1 prefix=$2 while read -r log ; do rotateLogFile "$log" # find distinct set of log file names, ignoring the rotation number (trailing dot and digit) done < <(find "$dir" ! -type d -path "${prefix}*" | sed -E s/\.[0-9]+$// | sort | uniq) }
rotateLogFile() { log=$1; num=$MAX_LOG_FILE_NUMBER if [ -f "$log" -a "$num" -gt 0 ]; then while [ $num -gt 1 ]; do prev=`expr $num - 1` [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" num=$prev done mv "$log" "$log.$num"; fi }
if [[ ! -f "${MASTERS_FILE}" ]]; then echo "No masters file. Please specify masters in 'conf/masters'." exit 1 fi
MASTERS=() WEBUIPORTS=()
MASTERS_ALL_LOCALHOST=true GOON=true while $GOON; do read line || GOON=false HOSTWEBUIPORT=$( extractHostName $line)
if [ -n "$HOSTWEBUIPORT" ]; then HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:) WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:) MASTERS+=(${HOST})
if [ -z "$WEBUIPORT" ]; then WEBUIPORTS+=(0) else WEBUIPORTS+=(${WEBUIPORT}) fi
if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then MASTERS_ALL_LOCALHOST=false fi fi done < "$MASTERS_FILE" } # 12.循环遍历slaves文件里面的节点 readSlaves() { SLAVES_FILE="${FLINK_CONF_DIR}/slaves"
if [[ ! -f "$SLAVES_FILE" ]]; then echo "No slaves file. Please specify slaves in 'conf/slaves'." exit 1 fi
SLAVES=()
SLAVES_ALL_LOCALHOST=true GOON=true while $GOON; do read line || GOON=false HOST=$( extractHostName $line) if [ -n "$HOST" ] ; then #获取所有slaves信息 SLAVES+=(${HOST}) #是否是本地模式判断 if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then SLAVES_ALL_LOCALHOST=false fi fi done < "$SLAVES_FILE" } # starts or stops TMs on all slaves # TMSlaves start|stop #10.启动所有的TaskManager TMSlaves() { CMD=$1 #11.读取slaves配置文件内容 readSlaves
if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then # all-local setup for slave in ${SLAVES[@]}; do #13.本机直接启动 "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" done else # non-local setup # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available command -v pdsh >/dev/null 2>&1 if [[ $? -ne 0 ]]; then for slave in ${SLAVES[@]}; do #14. 远程启动 ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" done else PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" fi fi }
HAVE_AWK= # same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) calculateNetworkBufferMemory() { local network_buffers_bytes if [ "${FLINK_TM_HEAP_MB}" -le "0" ]; then echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." exit 1 fi
if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then # fix memory size for network buffers network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} else if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." echo "Min must be less than or equal to max." echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." exit 1 fi
# Bash only performs integer arithmetic so floating point computation is performed using awk if [[ -z "${HAVE_AWK}" ]] ; then command -v awk >/dev/null 2>&1 if [[ $? -ne 0 ]]; then echo "[ERROR] Program 'awk' not found." echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." exit 1 fi HAVE_AWK=true fi
# We calculate the memory using a fraction of the total memory if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." echo "It must be between 0.0 and 1.0." echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." exit 1 fi
network_buffers_bytes=`awk "BEGIN { x = ${FLINK_TM_HEAP_MB} * 1048576 * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` fi
# recalculate the JVM heap memory by taking the network buffers into account local tm_heap_size_bytes=$((${FLINK_TM_HEAP_MB} << 20)) # megabytes to bytes if [[ "${tm_heap_size_bytes}" -le "${network_buffers_bytes}" ]]; then echo "[ERROR] Configured TaskManager memory size (${FLINK_TM_HEAP_MB} MB, from '${KEY_TASKM_MEM_SIZE}') must be larger than the network buffer memory size (${network_buffers_bytes} bytes, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')." exit 1 fi
echo ${network_buffers_bytes} } # same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) calculateTaskManagerHeapSizeMB() { if [ "${FLINK_TM_HEAP_MB}" -le "0" ]; then echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." exit 1 fi
local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes # network buffers are always off-heap and thus need to be deduced from the heap memory size local tm_heap_size_mb=$((${FLINK_TM_HEAP_MB} - network_buffers_mb))
if useOffHeapMemory; then
if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then # We split up the total memory in heap and off-heap memory if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP_MB} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')." exit 1 fi
tm_heap_size_mb=$((tm_heap_size_mb - FLINK_TM_MEM_MANAGED_SIZE)) else # Bash only performs integer arithmetic so floating point computation is performed using awk if [[ -z "${HAVE_AWK}" ]] ; then command -v awk >/dev/null 2>&1 if [[ $? -ne 0 ]]; then echo "[ERROR] Program 'awk' not found." echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}." exit 1 fi HAVE_AWK=true fi
# We calculate the memory using a fraction of the total memory if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value." echo "It must be between 0.0 and 1.0." echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}." exit 1 fi
# recalculate the JVM heap memory by taking the off-heap ratio into account local offheap_managed_memory_size=`awk "BEGIN { printf \"%.0f\n\", ${tm_heap_size_mb} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"` tm_heap_size_mb=$((tm_heap_size_mb - offheap_managed_memory_size)) fi fi
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`" else flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP}) FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes}) fi
if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi
if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m" fi
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# if memory allocation mode is lazy and no other JVM options are set, # set the 'Concurrent Mark Sweep GC' if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" fi
if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`" else flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) fi
if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi
if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used TM_MAX_OFFHEAP_SIZE="8388607T"
mkdir -p "$FLINK_PID_DIR" # Log files for daemons are indexed from the process ID's position in the PID # file. The following lock prevents a race condition during daemon startup # when multiple daemons read, index, and write to the PID file concurrently. # The lock is created on the PID directory since a lock file cannot be safely # removed. The daemon is started with the lock closed and the lock remains # active in this script until the script exits. command -v flock >/dev/null 2>&1 if [[ $? -eq 0 ]]; then exec 200<"$FLINK_PID_DIR" flock 200 fi # Ascending ID depending on number of lines in pid file. # This allows us to start multiple daemon of each type. id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') # Only set JVM 8 arguments if we have correctly extracted the version if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then if [ "$JAVA_VERSION" -lt 18 ]; then JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m" fi fi
# Print a warning if daemons are already running on host if [ -f "$pid" ]; then active=() while IFS='' read -r p || [[ -n "$p" ]]; do kill -0 $p >/dev/null 2>&1 if [ $? -eq 0 ]; then active+=($p) fi done < "${pid}"
count="${#active[@]}"
if [ ${count} -gt 0 ]; then echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi fi
# Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
# Add to pid file if successful start if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then echo $mypid >> "$pid" else echo "Error starting $DAEMON daemon." exit 1 fi ;;
(stop) if [ -f "$pid" ]; then # Remove last in pid file to_stop=$(tail -n 1 "$pid")
if [ -z $to_stop ]; then rm "$pid" # If all stopped, clean up pid file echo "No $DAEMON daemon to stop on host $HOSTNAME." else sed \$d "$pid" > "$pid.tmp" # all but last line
# If all stopped, clean up pid file [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"
if kill -0 $to_stop > /dev/null 2>&1; then echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." kill $to_stop else echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME." fi fi else echo "No $DAEMON daemon to stop on host $HOSTNAME." fi ;;
(stop-all) if [ -f "$pid" ]; then mv "$pid" "${pid}.tmp"
while read to_stop; do if kill -0 $to_stop > /dev/null 2>&1; then echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." kill $to_stop else echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME." fi done < "${pid}.tmp" rm "${pid}.tmp" fi ;;
if [[ ! -f "$SLAVES_FILE" ]]; then echo "No slaves file. Please specify slaves in 'conf/slaves'." exit 1 fi
SLAVES=()
SLAVES_ALL_LOCALHOST=true GOON=true while $GOON; do read line || GOON=false HOST=$( extractHostName $line) if [ -n "$HOST" ] ; then #获取所有slaves信息 SLAVES+=(${HOST}) #是否是本地模式判断 if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then SLAVES_ALL_LOCALHOST=false fi fi done < "$SLAVES_FILE" } # starts or stops TMs on all slaves # TMSlaves start|stop #10.启动所有的TaskManager TMSlaves() { CMD=$1 #11.读取slaves配置文件内容 readSlaves
if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then # all-local setup for slave in ${SLAVES[@]}; do #13.本机直接启动 "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" done else # non-local setup # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available command -v pdsh >/dev/null 2>&1 if [[ $? -ne 0 ]]; then for slave in ${SLAVES[@]}; do #14. 远程启动 ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" done else PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" fi fi }
public T parse(@Nonnull String[] args)throws FlinkParseException { finalDefaultParserparser=newDefaultParser(); finalOptionsoptions= parserResultFactory.getOptions();
final CommandLine commandLine; try { commandLine = parser.parse(options, args, true); } catch (ParseException e) { thrownewFlinkParseException("Failed to parse the command line arguments.", e); }
publicstaticfinalOptionREST_PORT_OPTION= Option.builder("r") .longOpt("webui-port") .required(false) .hasArg(true) .argName("rest port") .desc("Port for the rest endpoint and the web UI.") .build();
publicstaticfinalOptionDYNAMIC_PROPERTY_OPTION= Option.builder("D") .argName("property=value") .numberOfArgs(2) .valueSeparator('=') .desc("use value for given property") .build(); //这里就可以看到这些就是对这个后面传来的这些配置的解析
if (configDir == null) { thrownewIllegalArgumentException("Given configuration directory is null, cannot load configuration"); }
finalFileconfDirFile=newFile(configDir); if (!(confDirFile.exists())) { thrownewIllegalConfigurationException( "The given configuration directory name '" + configDir + "' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory."); } // final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);这里的这个是对解析配置文件 --------------------------------------------------------------------------------------------------- publicfinalclassGlobalConfiguration {