Implementation comparison and practice of distributed computing engines Flink/Spark on k8s

Implementation comparison and practice of distributed computing engines Flink/Spark on k8s

The underlying resource management platform of distributed batch and stream computing frameworks represented by Flink and Spark has gradually shifted from YARN in the Hadoop ecosystem to the native scheduler of Kubernetes and peripheral resource schedulers such as Volcano and Yunikorn. This article briefly compares the similarities and differences between the two computing frameworks in terms of support and implementation in Native Kubernetes, and what we need to do to apply them to production environments.

1. What is Native

The native here actually means that the computing framework directly applies for resources from Kubernetes. For example, many computing frameworks running on YARN need to implement an AppMaster to apply for resources from YARN's ResourceManager. Native K8s is equivalent to the computing framework implementing a role similar to AppMaster to apply for resources from k8s. Of course, it is different from AppMaster (AppMaster needs to be implemented according to YARN standards).

2. Use Spark on k8s

Submitting Assignments

Submitting a job to a k8s cluster is very similar to submitting it to YARN. The commands are as follows. The main differences include:

The --master parameter specifies the ApiServer of the k8s cluster
You need to specify the image to run the job in k8s through the parameter spark.kubernetes.container.image.
Specify the main jar, which needs to be accessible to the driver process: if the driver runs in a pod, the jar package needs to be included in the image; if the driver runs locally, the jar needs to be local.
Specify the name of the app through --name or spark.app.name. The driver name after the job is running will be prefixed with the app name. Of course, you can also specify the driver name directly through the parameter spark.kubernetes.driver.pod.name

  1. $ ./bin/spark-submit \ --master k8s: //https://: \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image= \ local:///path/to/examples.jar  

After submitting the command, spark-submit will create a driver pod and a corresponding servcie, and then the driver will create an executor pod and run the job.

deploy-mode

Just like using Spark on YARN, k8s also supports cluster and client modes:

cluster mode: driver runs as a pod on a k8s cluster.
Client mode: The driver runs where the job is submitted, and then the driver creates an executor on the k8s cluster. In order to ensure that the executor can be registered with the driver, the machine submitting the job must be connected to the executor network inside the k8s cluster (the executor can access the driver and needs to be registered).
Resource Cleanup

The resources here mainly refer to the driver and executor pods of the job. Spark connects various resources of the job through the onwer reference mechanism of k8s, so that when the driver pod is deleted, the associated executor pod will also be deleted. However, if there is no driver pod, that is, if the job is run in client mode, the following two situations involve resource cleanup:

The job is completed, the driver process exits, and the executor pod automatically exits after running.
If the driver process is killed, the executor pod will not be able to connect to the driver and will exit automatically. For more information, please refer to: https://kubernetes.io/docs/concepts/architecture/garbage-collection/

Dependency Management

As mentioned above, the main jar package needs to be accessible to the driver process. If it is cluster mode, the main jar needs to be packaged into the spark image. However, in daily development and debugging, the effort of rebuilding an image every time is too great. Spark supports using local files when submitting, and then using s3 as a transfer: upload first, and then download from s3 when the job is running. The following is an example.

  1. ...--packages org.apache.hadoop:hadoop-aws: 3.2 . 0 --conf spark.kubernetes.file.upload.path=s3a: ///path--conf spark.hadoop.fs.s3a.access.key=...--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem--conf spark.hadoop.fs.s3a.fast.upload=true--conf spark.hadoop.fs.s3a.secret.key=....--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmpfile:///full/path/to/app.jar  

Pod Template

When a k8s controller (such as a Deployment or a Job) creates a Pod, it creates it based on the pod template in the spec. The following is an example of a Job.

  1. apiVersion: batch/v1kind: Jobmetadata: name: hellospec: template: # The following is a pod template spec: containers: - name: hello image: busybox command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600'] restartPolicy: OnFailure # The pod template ends here

When we submit a spark job through spark-submit, the final k8s resources (driver/executor pod) are constructed by the internal logic of spark. But sometimes we want to do some additional work on the driver/executor pod, such as adding a sidecar container to do some log collection. In this scenario, PodTemplate is a better choice. At the same time, PodTemplate also decouples spark from the underlying infrastructure (k8s). For example, if k8s releases a new version to support some new features, we only need to modify our PodTemplate without involving internal changes to spark.

RBAC

RBAC stands for Role-based access control, which is a permission control mechanism in k8s. In simple terms:

RBAC includes a series of permission settings, such as create/delete/watch/list pod, etc. The entities of these permission sets are called Role or ClusterRole
At the same time, RBAC also includes role binding, which is used to assign Role/ClusterRole to one or a group of users, such as Service Account or UserAccount.
In order to run Spark jobs in a k8s cluster, we also need a set of RBAC resources:

Specify the serviceaccount under the namespace
Role or ClusterRole that defines permission rules. We can use the common ClusterRole "edit" (which has operation permissions for almost all resources, such as create/delete/watch, etc.)
The following command grants the serviceaccount spark the permission to operate other resources in the spark namespace. Then, as long as the driver pod of spark mounts the serviceaccount, it can create an executor pod.

  1. $ kubectl create serviceaccount spark$ kubectl create clusterrolebinding spark-role --clusterrole = edit   --serviceaccount = spark :spark --namespace = spark  

Here is a simple demonstration:

Submit the SparkPiSleep job to the k8s cluster using the following command.

  1. $ spark-submit --master k8s://https://: --deploy-mode cluster --class org.apache.spark.examples.SparkPiSleep --conf spark.executor.memory = 2g --conf spark.driver.memory = 2g --conf spark.driver.core = 1 --conf spark.app.name = test12 --conf spark.kubernetes.submission.waitAppCompletion = false --conf spark.executor.core = 1 --conf spark.kubernetes.container.image = --conf spark.eventLog.enabled = false --conf spark.shuffle.service.enabled = false --conf spark.executor.instances = 1 --conf spark.dynamicAllocation.enabled = false --conf spark spark.kubernetes.namespace =spark --conf spark spark.kubernetes.authenticate.driver.serviceAccountName =spark --conf spark.executor.core = 1 local:///path/to/main/jar

View the resources in the k8s cluster

  1. $ kubectl get po -n sparkNAME READY STATUS RESTARTS AGEspark-pi-5b88a27b576050dd-exec-1 0/1 ContainerCreating 0 2stest12-9fd3c27b576039ae-driver 1/1 Running 0 8s

The first one is the executor pod, and the second one is the driver pod. In addition, a service is created, through which the driver pod can be accessed, such as the Spark UI.

  1. $ kubectl get svc -n sparkNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEtest12-9fd3c27b576039ae-driver-svc ClusterIP None 7078/TCP,7079/TCP,4040/TCP 110s

Let's take a look at the service owner reference. The executor pod is similar.

  1. $ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyamlapiVersion: v1kind: Servicemetadata: creationTimestamp: "2021-08-18T03:48:50Z" name: test12-9fd3c27b576039ae-driver-svc namespace: spark # The ownerReference of the service points to the driver pod. As long as the driver pod is deleted, the service will also be deleted ownerReferences: - apiVersion: v1 controller: true kind: Pod name: test12-9fd3c27b576039ae-driver uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95 resourceVersion: "9975441" uid: 06c1349f-be52-4133-80d9-07af34419b1f

3. Use Flink on k8s

The implementation of Flink on k8s native supports two modes:

Application mode: Start a Flink cluster (jm and tm) in a remote k8s cluster, and the driver runs in jm, which means that only detached mode is supported, not attached mode.
Session mode: Start a permanent Flink cluster (only JM) in the remote k8s cluster, and then submit jobs to it, and decide how many TMs to start based on actual conditions.
It is generally not recommended to use session mode in production, so the following mainly discusses application mode.

Flink's native k8s mode does not require specifying the number of tm. jm will calculate the number of tm required based on the user's code.

Submitting Assignments

The following is a simple submission command that needs to include:

The parameter run-application specifies the application mode. The parameter --target specifies running on k8s. The parameter kubernetes.container.image specifies the flink image used for the job. Finally, you need to specify the main jar, the path is the path in the image.

  1. $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id = my -first-application-cluster \ -Dkubernetes.container.image = custom -image-name \ local:///opt/flink/usrlib/my-flink-job.jar

Resource Cleanup

In native mode of Flink, a JobManager deployment is created and hosted on k8s. The owner reference of all resources related to the same job points to the deployment, which means that if the deployment is deleted, all related resources will be cleaned up. The following discusses how to clean up resources based on the running status of the job.

After the job runs to the final state (SUCCESS, FAILED, CANCELED, etc.), Flink will clean up all jobs.
The JobManager process failed to start (the jm container in the pod failed to start). Since the controller is a Deployment, it will be repeatedly pulled up and run. If the JobManager pod is deleted, the Deployment will be pulled up and run again. If the JobManager Deployment is deleted, all associated k8s resources will be deleted.

Pod Template

Flink native mode also supports Pod Template, similar to Spark.

RBAC

Similar to Spark.

Dependency file management

Flink currently only supports main jar and dependent files in the image. This means that users need to customize the image themselves to submit jobs, which is not a very good experience. One way to workaround is to combine PodTemplate:

If the dependency is a local file, it needs to be uploaded to a remote storage for transfer, such as the object storage of major cloud vendors.
If the dependency is a remote file, no upload is required.
At runtime, initContainer is used in the template to download the user's jar and dependency files to the Flink container and add them to the classpath for running.
Flink's job demo will not be demonstrated here.

4. Spark on Kubernetes Implementation

The implementation of Spark on Kubernetes is relatively simple:

Spark Client creates a k8s pod to run the driver
The driver creates an executor pod and starts running the job. After the job is finished, the driver pod enters the Completed state and the executor pod is cleaned up. After the job is finished, we can still view the driver pod through the driver pod.

Code Implementation

The native k8s implementation code of Spark is in the resource-managers/kubernetes module. We can start the analysis from the SparkSubmit code. We mainly look at the code logic of the cluster mode of deploy-mode.

  1. // Set the cluster manager val clusterManager: Int = args .master match { case "yarn" = > YARN case m if m.startsWith("spark") = > STANDALONE case m if m.startsWith("mesos") = > MESOS case m if m.startsWith("k8s") = > KUBERNETES case m if m.startsWith("local") = > LOCAL case _ = > error("Master must either be yarn or start with spark, mesos, k8s, or local") -1 }

First, determine whether it is on k8s based on the scheme in the spark.master configuration. We also see that this configuration is in the form of --master k8s://https://: . If it is on k8s cluster mode, load the class org.apache.spark.deploy.k8s.submit.KubernetesClientApplication and run the start method. The core logic of the childArgs method is to construct a driver pod based on the parameters submitted by spark-submit and submit it to k8s for execution.

  1. private[spark] class KubernetesClientApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val parsedArguments = ClientArguments .fromCommandLineArgs(args) run(parsedArguments, conf) } private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values ​​are // considerably restrictive, eg must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = KubernetesConf .getKubernetesAppId() val kubernetesConf = KubernetesConf .createDriverConf( sparkConf, kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, clientArguments.proxyUser) // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = KubernetesUtils .parseMasterUrl(sparkConf.get("spark.master")) val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, SparkKubernetesClientFactory.ClientType.Submission, sparkConf, None, None)) { kubernetesClient = > val client = new Client( kubernetesConf, new KubernetesDriverBuilder(), kubernetesClient, watcher) client.run() } }}

The core of the above code is to create and run the client. This client is a client encapsulated by Spark and has a built-in k8s client.

  1. private[spark] class Client( conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, kubernetesClient: KubernetesClient, watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { // Construct Driver's Pod val resolvedDriverSpec = builder .buildFromFeatures(conf, kubernetesClient) val configMapName = KubernetesClientUtils .configMapNameDriver val confFilesMap = KubernetesClientUtils .buildSparkConfDirFilesMap(configMapName, conf.sparkConf, resolvedDriverSpec.systemProperties) val configMap = KubernetesClientUtils .buildConfigMap(configMapName, confFilesMap) // Modify the Pod's container spec: Add SPARK_CONF_DIR val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container) .addNewEnv() .withName(ENV_SPARK_CONF_DIR) .withValue(SPARK_CONF_DIR_INTERNAL) .endEnv() .addNewVolumeMount() .withName(SPARK_CONF_VOLUME_DRIVER) .withMountPath(SPARK_CONF_DIR_INTERNAL) .endVolumeMount() .build() val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod) .editSpec() .addToContainers(resolvedDriverContainer) .addNewVolume() .withName(SPARK_CONF_VOLUME_DRIVER) .withNewConfigMap() .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava) .withName(configMapName) .endConfigMap() .endVolume() .endSpec() .build() val driverPodName = resolvedDriverPod .getMetadata.getName var watch: Watch = null var createdDriverPod: Pod = null try { // Create Driver Pod through k8s client createdDriverPod = kubernetesClient .pods().create(resolvedDriverPod) } catch { case NonFatal(e) = > logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.") throw e } try { // Create other resources, modify owner reference, etc. val otherKubernetesResources = resolvedDriverSpec .driverKubernetesResources ++ Seq(configMap) addOwnerReference(createdDriverPod, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) = > kubernetesClient.pods().delete(createdDriverPod) throw e } val sId = Seq (conf.namespace, driverPodName).mkString(":") // watch pod breakable { while (true) { val podWithName = kubernetesClient .pods() .withName(driverPodName) // Reset resource to old before we start the watch, this is important for race conditions watcher.reset() watch = podWithName .watch(watcher) // Send the latest pod state we know to the watcher to make sure we didn't miss anything watcher.eventReceived(Action.MODIFIED, podWithName.get()) // Break the while loop if the pod is completed or we don't want to wait // Determine whether to exit based on the parameter "spark.kubernetes.submission.waitAppCompletion" if(watcher.watchOrStop(sId)) { watch.close() break } } } }

The following is a brief introduction to the process of how the Driver manages the Executor. When the Spark Driver runs the main function, it creates a SparkSession, which contains the SparkContext. The SparkContext needs to create a SchedulerBackend to manage the life cycle of the Executor. The SchedulerBackend corresponding to k8s is actually the KubernetesClusterSchedulerBackend. The following mainly looks at how this backend is created. A bold guess is that it is probably created based on the scheme "k8s" of the spark.master URL.

The following is the core code logic of SparkContext creating SchedulerBackend.

  1. private def createTaskScheduler(...) = { case masterUrl = > // Create KubernetesClusterManager val cm = getClusterManager (masterUrl) match { case Some(clusterMgr) = > clusterMgr case None = > throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm .createTaskScheduler(sc, masterUrl) // The KubernetesClusterManager created above will create the KubernetesClusterSchedulerBackend val backend = cm .createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException = > throw se case NonFatal(e) = > throw new SparkException("External scheduler cannot be instantiated", e) }}// Method getClsuterManager loads all implementations of ExternalClusterManager through ServiceLoader ClusterManager (KubernetesClusterManager and YarnClusterManager), and then filter through the master url to select KubernetesClusterManagerprivate def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils .getContextOrSparkClassLoader val serviceLoaders = ServiceLoader .load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption}

The following is the logic of KubernetesClusterSchedulerBackend managing Executor.

You can take a quick look at the code logic for creating an Executor.

  1. private def requestNewExecutors( expected: Int, running: Int, applicationId: String, resourceProfileId: Int, pvcsInUse: Seq[String]): Unit = { val numExecutorsToAllocate = math .min(expected - running, podAllocationSize) logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs (applicationId, pvcsInUse) for ( _ < - 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER .incrementAndGet() val executorConf = KubernetesConf .createExecutorConf( conf, newExecutorId.toString, applicationId, driverPod, resourceProfileId) // Construct the Executor's Pod Spec val resolvedExecutorSpec = executorBuilder .buildFromFeatures(executorConf, secMgr, kubernetesClient, rpIdToResourceProfile(resourceProfileId)) val executorPod = resolvedExecutorSpec .pod val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) .endSpec() .build() val resources = replacePVCsIfNeeded ( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) // Create Executor Pod val createdExecutorPod = kubernetesClient .pods().create(podWithAttachedContainer) try { // Add owner reference addOwnerReference(createdExecutorPod, resources) resources .filter( _.getKind == "PersistentVolumeClaim") .foreach { resource = > if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { addOwnerReference(driverPod.get, Seq(resource)) } val pvc = resource .asInstanceOf[PersistentVolumeClaim] logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + s"StorageClass ${pvc.getSpec.getStorageClassName}") kubernetesClient.persistentVolumeClaims().create(pvc) } newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } catch { case NonFatal(e) = > kubernetesClient.pods().delete(createdExecutorPod) throw e } } }

5. Flink on Kubernetes Implementation

Native K8s implementation of Flink:

Flink Client creates a Deployment for JobManager and then hosts the Deployment to k8s
k8s Deployment Controller creates JobManager Pod
The ResourceManager in the JobManager is responsible for requesting resources from the Kubernetes Scheduler first and creating related resources such as TaskManager and creating related TaskManager Pods and starting to run the job. When the job runs to the final state, all related k8s resources are cleaned up. The code (based on branch release-1.13) is mainly implemented as follows:

CliFrontend is the entry point of Flink Client. It determines the creation of ApplicationCluster through the runApplication method according to the command line parameter run-application.
KubernetesClusterDescriptor creates the JobManager-related Deployment and some necessary resources through the method deployApplicationCluster
The implementation class of JobManager, JobMaster, calls the method requestResource in the class KubernetesResourceManagerDriver through ResourceManager to create resources such as TaskManager. KubernetesClusterDescriptor implements the interface ClusterDescriptor to describe the operation of the Flink cluster. Depending on the use of the underlying resources, ClusterDescriptor has different implementations, including KubernetesClusterDescriptor, YarnClusterDescriptor, and StandaloneClusterDescriptor.

  1. public interface ClusterDescriptor < T > extends AutoCloseable { /* Returns a String containing details about the cluster (NodeManagers, available memory, ...). */ String getClusterDescription(); /* Query an existing Flink cluster. */ ClusterClientProvider < T > retrieve(T clusterId) throws ClusterRetrieveException; /** Create a Flink Session cluster */ ClusterClientProvider < T > deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; /** Create a Flink Application cluster **/ ClusterClientProvider < T > deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException; /** Create a Per-job cluster **/ ClusterClientProvider < T > deployJobCluster( final ClusterSpecification clusterSpecification, final JobGraph jobGraph, final boolean detached) throws ClusterDeploymentException; /** Delete cluster **/ void killCluster(T clusterId) throws FlinkException; @Override void close();}

Let's take a quick look at the core logic of KubernetesClusterDescriptor: Creating an Application cluster.

  1. public class KubernetesClusterDescriptor implements ClusterDescriptor < String > { private final Configuration flinkConfig; // Built-in k8s client private final FlinkKubeClient client; private final String clusterId; @Override public ClusterClientProvider < String > deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException { // Query the flink cluster in Does it exist in k8s if (client.getRestService(clusterId).isPresent()) { throw new ClusterDeploymentException( "The Flink cluster " + clusterId + " already exists."); } final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget .fromConfig(flinkConfig); if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { throw new ClusterDeploymentException( "Couldn't deploy Kubernetes Application Cluster." + " Expected deployment.target = " + KubernetesDeploymentTarget.APPLICATION.getName() + " but actual one was \"" + deploymentTarget + "\""); } // Set application parameters: $internal.application.program-args and $internal.application.main applicationConfiguration.applyToConfiguration(flinkConfig); // Create a cluster final ClusterClientProvider < String >   clusterClientProvider = deployClusterInternal ( KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); try (ClusterClient < String >   clusterClient = clusterClientProvider .getClusterClient()) { LOG.info( "Create flink application cluster {} successfully, JobManager Web Interface: {}", clusterId, clusterClient.getWebInterfaceURL()); } return clusterClientProvider; } // Create cluster logic private ClusterClientProvider < String > deployClusterInternal( String entryPoint, ClusterSpecification clusterSpecification, boolean detached) throws ClusterDeploymentException { final ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfig.setString( ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. // Specifying the port as a fixed value facilitates the resource construction of k8s. Because of the isolation of pods, there is no port conflict KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); // HA configuration if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, flinkConfig.get(JobManagerOptions.PORT)); } try { final KubernetesJobManagerParameters kubernetesJobManagerParameters = new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); // Supplement PodTemplate logic final FlinkPod podTemplate = kubernetesJobManagerParameters .getPodTemplateFilePath() .map( file - > KubernetesUtils.loadPodFromTemplateFile( client, file, Constants.MAIN_CONTAINER_NAME)) .orElse(new FlinkPod.Builder().build()); final KubernetesJobManagerSpecification kubernetesJobManagerSpec = KubernetesJobManagerFactory .buildKubernetesJobManagerSpecification( podTemplate, kubernetesJobManagerParameters); // Core logic: Create in k8s including JobManager Deployment includes k8s resources, such as Service and ConfigMap client.createJobManagerComponent(kubernetesJobManagerSpec); return createClusterClientProvider(clusterId); } catch (Exception e) { //... } }}

In the above code, we need to add PodTemplate when building JobManager. In simple terms, PodTemplate is a Pod file.

The creation of TaskManager in the third step will not be repeated here.

7. Ecosystem

The word "ecosystem" may not be appropriate here. It mainly refers to what else can be done if this function is to be used in production. The following mainly discusses two functions used for troubleshooting in the production environment: logging and monitoring.

log

Log collection is a very important part of online systems. It is no exaggeration to say that 80% of faults can be found through logs. However, as mentioned earlier, Flink jobs will clean up all resources after the job runs to the final state, and Spark jobs will only retain the logs of the Driver Pod after running. So how can we collect complete job logs?

There are several options to choose from:

DaemonSet: A log collection agent is deployed on each k8s node in the form of DaemonSet to collect all container logs running on the node and store them on a unified log search platform like ElasticSearch.
SideCar. Use the PodTemplate function provided by Flink/Spark to configure a SideCar container on the main container side to collect logs, and finally store them in a unified log service.
Both of these methods require that other log services provide storage and even search capabilities, such as ELK or log services from major cloud vendors.

In addition, there is another simple method that can be considered: using the extension mechanism of log4j, customizing the log appender, customizing the append logic in the appender, and directly collecting and storing the logs in remote storage, such as HDFS, object storage, etc. This solution requires putting the jar package of the custom log appender under the ClassPath of the running job, and this method may affect the running efficiency of the main process of the job. It is not recommended for jobs that are sensitive to performance.

monitor

At present, Prometheus has become the monitoring fact standard for the k8s ecosystem. Our discussion below is also about how to connect the indicators of Flink/Spark operations to Prometheus. Let’s take a look at the architecture of Prometheus first.

The core of this is whether Prometheus Servier collects metrics or push:

For resident processes, such as online services, Prometheus Server generally takes the initiative to remove the API pull indicators exposed by the process.
For process metrics that will end, such as batch jobs, the process generally uses the method of actively pushing the process. The detailed process is that the process pushes the metrics to the resident PushGateway, and then Prometheus Server PushGateway pulls the metrics.
The above two usage methods are also the official recommended usage methods by Prometheus, but after reading the description, it is not difficult to find that the second processing method can be used in the first scenario. However, the second method is that since PushGateway is permanent, it requires relatively high stability.

Flink

Flink also provides PrometheusReporter (exposed metrics through APIs, and actively pulled data by Prometheus Server) and PrometheusPushGatewayReporter (actively push metrics to PushGateway, Prometheus Server does not need to perceive Flink jobs).

Among these two methods, PrometheusPushGatewayReporter will be a little simpler, but PushGateway may become a bottleneck. If you use PrometheusReporter, you need to introduce a service discovery mechanism to help Prometheus Server automatically discover the Endpoint of running Flink jobs. The mainstream service discovery mechanisms currently supported by Prometheus are mainly:

Based on Consul. Consul is a complete service registration and discovery solution based on etcd. To use this method, we need Flink to connect Consul. For example, when we submit a job, we capture the corresponding service of the job and write it to Consul.
File-based. The file is the Prometheus configuration file, which requires the target endpoint to be pulled. The file method is originally a waste of money because it requires both Prometheus Server and Flink jobs to be accessed at the same time, but the file needs to be local. However, in the k8s environment, based on files becomes relatively simple. We can mount ConfigMap to the Pod of Prometheus Server, and modify the ConfigMap for the Flink job.
The service discovery mechanism based on Kubernetes. The service discovery mechanism of Kubernetes is simply label select. You can refer to it

  1. https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config

For more service discovery mechanisms supported by Prometheus, please refer to: https://prometheus.io/docs/prometheus/latest/configuration/configuration/, a simple list includes:

Azure
consul
digitalocean
docker
dockerswarm
DNS
ec2
eureka
file
gce
hetzner
http
Kubernetes
...
Spark

Spark, represented by batch computing, uses PushGateway to connect to Prometheus, but Spark does not provide support for PushGateway, but only supports Prometheus' Exporter, and requires Prometheus Server to actively pull data.

It is recommended to use a Kubernetes-based service discovery mechanism.

It should be noted that the Prometheus Server pull indicator is pulled at a fixed time interval. For batch jobs with a relatively short duration, it is possible that the job will end without the pull indicator.

8. Defects

Although Spark and Flink both implement the native k8s model, the specific implementation is slightly different. However, in actual use, the implementation of the two is still slightly flawed in some scenarios.

Spark

Pod does not have fault tolerance spark-submit will first build a k8s driver pod, and then the driver pod starts the executor pod. However, in the k8s environment, it is not recommended to directly build pod resources, because pods are not fault-tolerant, and the pod will hang after the node where the pod is hung. Students who are familiar with k8s scheduler should know that pod has a field called podName. The core of scheduler is to fill this field for pod, that is, select a suitable node for pod. Once the scheduler is completed, the field of pod is fixed. This is also the reason why pods do not have node fault tolerance.

Flink

Deployment semantics. Deployment can be considered an enhanced version of ReplicaSet, and the official definition of ReplicaSet is as follows.

A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

Simply put, the purpose of ReplicaSet is to ensure that several identical pod copies can run continuously. It is not an exaggeration to say that they are tailor-made for online services (online services are preferably stateless and support on-site restarts, such as WebService). However, although Flink mainly uses streaming jobs, we cannot simply equate streaming jobs with stateless WebService. For example, if the Main Jar of Flink jobs is written in a problem, it will cause the JobManager Pod to be started continuously, but due to the problem of Deployment semantics, it will be restarted continuously. This may be ByDesign, but it doesn't feel very good.

Batch job processing. Since all resources, including Deployment will be cleaned up after the Flink job runs, the final job status will not be obtained, and I don’t know if it is successful (if the streaming job is stopped, it can be considered a failure). For this problem, you can use Flink’s own archive function to archive the results to an external file system (compatible with the s3 protocol, such as Alibaba Cloud object storage oss). The configuration involved is as follows:

s3.access-key
s3.secret-key
s3.region
s3.endpoint
jobmanager.archive.fs.dir
If you do not want to introduce external systems, you need to modify the Flink code and write the data into the k8s API object after the job is completed, such as ConfigMap or Secret.

Job log. After the Spark job is run, the Executor Pod is cleaned up and the Driver Pod is retained. We can view the Driver log through it. After the Flink job is completed, no log can be viewed.

9. Summary

This article systematically introduces the implementation, practice and comparison of Spark and Flink in the k8s ecology from the usage method, source code implementation and how to make up for the surroundings in the production system. However, due to space, there is no time to discuss a lot of content, such as how to handle shuffle. If your company is also doing this, I believe it still has a lot of reference value, and you are welcome to leave a message and communicate.

In addition, the era of YARN has passed, and in the future on k8s scheduler will become the standard configuration of big data computing and AI frameworks. However, k8s scheduler, a scheduler designed for online services, has great shortcomings in throughput and is not very suitable for big data operations. The batch scheduler of the k8s community, kube-batch, and the Volcano scheduler derived from kube-batch, and the k8s ecological scheduler based on YARN scheduling algorithm, have gradually emerged in the big data on k8s scenario. However, these are all later stories. I will write articles for analysis and comparison when I have time.

<<:  5G will catalyze the era of large-scale innovation in the whole society

>>:  Kafka message sending thread and network communication

Recommend

Public cloud + 5G core network, is the wolf really coming?

This article is reprinted from the WeChat public ...

CrownCloud: Los Angeles AMD Ryzen KVM special price starts at $30 per year

In April this year, CrownCloud launched the AMD R...

The real year of 5G: What it means for cloud technology

We are now in the third year of “The Year of 5G.”...

How can blockchain become a “trust machine” amid market chaos?

In recent years, the development of blockchain te...

Why is Web 3.0 important? What are the basics you need to know?

Editor's Note: What is Web 3.0? How does it d...

As we enter 2021, is the speed of 5G mobile phones faster or slower?

In China, 5G has blossomed in the past year. Not ...

Robotics and AI: The future of software testing and development

【51CTO.com Quick Translation】 About a year ago, a...

Outlook for domestic 5G development in 2021 (I): Current status

The development of 5G has now become another hot ...

Karamay: Huawei's first cloud strategic cooperation city in the world

Karamay is a desert city that was born and prospe...