Blog /

Engineering

Spark for EMR on EKS with Graviton Processors

Sep 26, 2022

Engineering

Featured

AWS Graviton processors deliver up to 40% better price/performance and are the perfect option for EKS clusters, reducing cloud costs further by eliminating over-provisioning or under-utilization of compute resources.

1. Introduction

Apache Spark on Kubernetes is generally available since release 3.1.0 and is quickly gaining popularity as an alternative to YARN clusters, enabling efficient resource sharing, multi cloud deployments and access to a rich open-source ecosystem of tools ranging from monitoring and management to end-to-end data and machine learning pipelines. On Amazon EMR on EKS, Spark jobs can be deployed on Kubernetes clusters, sharing compute resources across applications with a single set of Kubernetes tools centralizing monitoring and management of infrastructure.

Unfortunately, it is not straightforward to deploy Spark workloads to a Kubernetes cluster that is mostly comprised of Graviton-based instance types: The Spark operator does not seem to be compatible with architectures other than x86 and we encountered problems when using the manifests from official AWS guides. This article walks through an end-to-end example of running a demo application on an EKS cluster with Graviton or x86-based instances and on an EMR virtual cluster: The next section describes how customized Docker images with open-source and EMR runtimes can be created and published to ECR. Sample configuration steps for provisioning an EKS cluster and running an oss Spark job on it are covered in section 3. In the last part of this post, EMR on EKS is enabled and a job run on a virtual cluster is scheduled.

2. Customizing Docker Images

Kubernetes is a container orchestrator so applications need to be containerized before they can be submitted to an EKS cluster. On AWS, container images are distributed with the Elastic Container Registry service. The following subsections show how to build and publish custom Docker images with a Spark distribution and an EMR runtime. In the second half of this article, these images are deployed to an EKS cluster.

2.1 Prerequisites 

Container images are stored in repositories of a registry. For the two custom Docker images that will be used later, two private ECR repositories with the names spark_3.1.2_tpch and emr_6.5_tpch should be created:

We build the images on EC2. The choice of the instance type depends on the processor architecture of the EKS cluster nodes that will host Spark pods: When ARM-based nodes will be provisioned, the m6g.xlarge instance type is more than sufficient for creating the custom images. For nodes with x86 architecture CPUs, an m5.xlarge instance should be launched. Amazon Linux 2 needs to be selected as the machine image and the root volume size should be set to around 20 GiB. When the instance is up and running, we log onto it and install a few dependencies: 


$ sudo yum install docker
$ sudo service docker start
$ sudo amazon-linux-extras enable corretto8
$ sudo yum install java-1.8.0-amazon-corretto-devel
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0-amazon-corretto

It is advisable to set a few environment variables, this shortens subsequent commands for pushing and pulling images:


$ export AWS_REGION=us-east-1
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

The format of a repository URL is determined by its visibility:

  • URLs for private ECR repositories follow the format account-id.dkr.ecr.region.amazonaws.com
  • Public repo URLs start with public.ecr.aws/ followed by an alias 

A variable that stores a private repo URL is defined as follows: 


$ export ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com

2.2 Building a Custom OSS Spark Image

The Apache Spark codebase already contains a Dockerfile that is suitable for creating a base image with a binary distribution from which more specialized images can be derived. As first step, the Spark source code needs to be downloaded and extracted:


$ wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2.tgz
$ tar -xvzf spark-3.1.2.tgz

The AWS EKS guide suggests to build Spark 3.1.2 with a member of the Hadoop 3.3 release line so improvements for Kubernetes and S3 can take effect. One entry of the root POM file has to be modified for this purpose:


$ sed -i 's|<hadoop.version>3.2.0</hadoop.version>|<hadoop.version>3.3.1</hadoop.version>|' spark-3.1.2/pom.xml

A runnable Spark distribution is then composed with the help of the make-distribution script:


$ ll
total 24164
drwxr-xr-x 30 ec2-user ec2-user     4096 May 24  2021 spark-3.1.2
-rw-rw-r--  1 ec2-user ec2-user 24738927 May 24  2021 spark-3.1.2.tgz
  
$ ./spark-3.1.2/dev/make-distribution.sh --name spark-oss --pip --tgz -Phadoop-cloud -Phadoop-3.2 -Pkubernetes -DskipTests

Now the base image can be created:


$ cd spark-3.1.2/dist
$ sudo docker build -f kubernetes/dockerfiles/spark/Dockerfile -t spark_3.1.2 --build-arg java_image_tag=8-jre-slim .
$ sudo docker image ls
REPOSITORY   TAG          IMAGE ID       CREATED              SIZE
spark_3.1.2  latest       19ca9b6276c9   About a minute ago   679MB
openjdk      8-jre-slim   85b121affedd   7 weeks ago          194MB

Since the sample program that is executed later on needs to access the TPC-H kit, a layer with this library should be added to the base image. Our Dockerfile contains all the instructions required for assembling a TPC-H image:


ARG BASE_IMAGE=755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0:latest

FROM amazonlinux:2 as tpc-toolkit

RUN yum group install -y "Development Tools" && \
    git clone https://github.com/databricks/tpch-dbgen.git /tmp/tpch-dbgen && \
    cd /tmp/tpch-dbgen && \
    git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c  && \
    make OS=LINUX

FROM ${BASE_IMAGE}

COPY --from=tpc-toolkit /tmp/tpch-dbgen /opt/tpch-dbgen

The TPC-H image is derived as follows:


$ cd ~
$ wget https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/Dockerfile
$ sudo docker build -t $ECR_URL/spark_3.1.2_tpch -f ./Dockerfile --build-arg BASE_IMAGE=spark_3.1.2 .
$ sudo docker image ls
REPOSITORY                                                      TAG          IMAGE ID       CREATED          SIZE
[...]
spark_3.1.2                                                     latest       19ca9b6276c9   3 minutes ago    679MB
000000000000.dkr.ecr.us-east-1.amazonaws.com/spark_3.1.2_tpch   latest       5e4f08f28cb5   38 seconds ago   682MB

Before the new image can be pushed to the spark_3.1.2_tpch repository, Docker must be authenticated to the registry:


$ aws ecr get-login-password --region $AWS_REGION | sudo docker login --username AWS --password-stdin $ECR_URL

After the authentication succeeds, the image is published:


$ sudo docker push $ECR_URL/spark_3.1.2_tpch

2.3 Building a custom EMR Image

The spark_3.1.2_tpch image is not compatible with EMR on EKS as it encapsulates open source Spark binaries. Therefore, a similar package that embeds an EMR runtime needs to be created before the demo application can be used in a job run. EMR images are publicly available, the different ECR registry accounts are listed on this page.  For the us-east-1 region, the corresponding repository URL is 755674844232.dkr.ecr.us-east-1.amazonaws.com:


$ export ECR_PUBLIC=755674844232.dkr.ecr.us-east-1.amazonaws.com

An authentication step is required before accessing any official AWS image:


$ aws ecr get-login-password --region $AWS_REGION | sudo docker login --username AWS --password-stdin $ECR_PUBLIC

The EMR equivalent to the spark_3.1.2_tpch image can now be assembled. Because a 3.1.2 distribution was compiled in the previous section, a base image with an emr-6.5.0 tag should be downloaded since the same Spark version is supported by this EMR release:


$ sudo docker pull $ECR_PUBLIC/spark/emr-6.5.0:latest
$ sudo docker image ls
REPOSITORY                                                     TAG          IMAGE ID       CREATED         SIZE
[...]
755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0   latest       77290d3aac6a   6 weeks ago     1.47GB

The Dockerfile that adds the TPC-H kit to the container filesystem is reused, but this time an EMR URI is passed as its base image argument:


$ ls
Dockerfile  spark-3.1.2  spark-3.1.2.tgz
$ sudo docker build -t $ECR_URL/emr_6.5_tpch -f ./Dockerfile --build-arg BASE_IMAGE=$ECR_PUBLIC/spark/emr-6.5.0:latest .
$ sudo docker image ls
REPOSITORY                                                     TAG          IMAGE ID       CREATED              SIZE
[...]
755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0   latest       77290d3aac6a   6 weeks ago          1.47GB
402730444140.dkr.ecr.us-east-1.amazonaws.com/emr_6.5_tpch      latest       779050788b59   About a minute ago   1.47GB

Finally, the derived EMR-TPCH image is uploaded to a dedicated ECR repository:


$ sudo docker push $ECR_URL/emr_6.5_tpch

After the images have been published, the EC2 instance that executed the build instructions is no longer needed and should be terminated.

3. Creating an EKS Cluster

The following paragraphs explain how a Kubernetes cluster can be deployed with EKS. In section 4, a Spark application is submitted to this cluster and in the last part of this article, EMR is registered with it and a job run is initiated.

3.1 Prerequisites

EKS clusters can be launched from local development environments but the command-line tools <span class="monospace">kubectl</span> and <span class="monospace">eksctl</span> need to be installed, more information is provided on this page. The architecture and basic concepts of Kubernetes are described here.

3.2 Cluster Configuration

Our eks-cluster-arm.yaml and eks-cluster-x86.yaml manifests contain the configuration details for a basic EKS cluster.

eks-cluster-arm.yaml


---
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: eks-test-cluster
  region: us-east-1 # ToDo: Modify
  version: "1.23"
availabilityZones: [ "us-east-1a", "us-east-1d" ] # ToDo: Modify
vpc:
  nat:
    gateway: HighlyAvailable
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
cloudWatch: # Control Plane logs
  clusterLogging:
    enableTypes: [ "*" ]
iam:
  withOIDC: true
  serviceAccounts:
    - metadata:
        name: cluster-autoscaler
        namespace: kube-system
        labels: { aws-usage: "cluster-ops" }
      wellKnownPolicies:
        autoScaler: true
      roleName: eks-test-autoscaler-role
managedNodeGroups:
  - name: tooling
    instanceType: t3.large
    desiredCapacity: 1
    volumeSize: 20
    labels:
      noderole: tooling
    tags:
      k8s.io/cluster-autoscaler/node-template/label/noderole: tooling
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
nodeGroups:
  - name: driver-group
    availabilityZones: [ "us-east-1d" ] # ToDo: Modify
    desiredCapacity: 1
    volumeSize: 15
    privateNetworking: true
    instanceType: "m6gd.large"
    labels:
      arch: arm64
      noderole: driver
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned"
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV}; mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a
  - name: exec-group
    availabilityZones: [ "us-east-1d" ] # ToDo: Modify
    desiredCapacity: 2
    volumeSize: 15
    privateNetworking: true
    instanceType: "m6gd.2xlarge"
    labels:
      arch: arm64
      noderole: executor
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned"
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV}; mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a


If the application images support ARM architectures, the file with the matching arm infix needs to be downloaded as it specifies worker nodes with this CPU architecture:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-cluster-arm.yaml > eks-test-cluster.yaml

The manifest eks-cluster-x86.yaml should be used for the alternative x86 architecture option:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-cluster-x86.yaml > eks-test-cluster.yaml

A dedicated VPC will be created for the cluster, so only a few lines that reference availability zones and are marked with ToDo comments may have to be modified. The test cluster can be spawned with the <span class="monospace">eksctl</span> command-line tool:


# Make modifications to eks-test-cluster.yaml if required
$ eksctl create cluster -f eks-test-cluster.yaml 

As part of the cluster creation process that is started by the last command, several infrastructure resources are provisioned and configured via CloudFormation. These operations tend to be time consuming and can take more than half an hour to complete. 

If the cluster launch is successful, the final log message will contain the following statement:


[✔]  EKS cluster "eks-test-cluster" in "us-east-1" region is ready

In case errors appear on the terminal, it might be necessary to manually delete cluster-related stacks in the CloudFormation console and re-execute the <span class="monospace">eksctl</span> command.

Eventually, an EKS cluster that consists of four EC2 instances will be up and running. The local kubeconfig file should have been updated with an entry for eks-test-cluster, in which case the <span class="monospace">kubectl</span>/<span class="monospace">eksctl</span> CLIs point to the new cluster. If the commands used below return nothing or information of a different cluster, the kubeconfig documentation should be consulted.


$ kubectl get nodes
NAME                             STATUS   ROLES    AGE   VERSION
ip-192-xxx-xxx-64.ec2.internal   Ready    <none>   18m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-87.ec2.internal   Ready    <none>   18m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-19.ec2.internal   Ready    <none>   23m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-17.ec2.internal   Ready    <none>   20m   v1.23.9-eks-ba74326

The four instances listed in the terminal output above are grouped into three node pools:


$ eksctl get nodegroup --cluster=eks-test-cluster
CLUSTER        NODEGROUP       STATUS             DES. CAP. INSTANCE TYPE  TYPE
eks-test-cluster tooling       ACTIVE             1         t3.large       managed
eks-test-cluster driver-group  CREATE_COMPLETE    1         m6gd.large     unmanaged
eks-test-cluster exec-group    CREATE_COMPLETE    2         m6gd.2xlarge   unmanaged

The control plane and other processes that provide core K8s functionality run on a low cost t3.large instance in the tooling nodegroup. The Spark driver pod will be hosted on a dedicated m6gd.large (or m5d.large for x86) instance. Executor pods can be assigned to two m6gd.2xlarge (or m5d.2xlarge) workers which comprise the exec-group nodegroup. No Spark pods have been launched yet but this will change during the next two sections:


$ kubectl get pods -A
NAMESPACE     NAME                      READY   STATUS    RESTARTS   AGE
kube-system   aws-node-6npz4            1/1     Running   0          27m
kube-system   aws-node-m4xfw            1/1     Running   0          27m
kube-system   aws-node-w2pzq            1/1     Running   0          29m
kube-system   aws-node-wfsxl            1/1     Running   0          32m
kube-system   coredns-7f5998f4c-42m7r   1/1     Running   0          42m
kube-system   coredns-7f5998f4c-hj44f   1/1     Running   0          42m
kube-system   kube-proxy-frtf8          1/1     Running   0          27m
kube-system   kube-proxy-hfmdz          1/1     Running   0          32m
kube-system   kube-proxy-jg7g2          1/1     Running   0          27m
kube-system   kube-proxy-nn4qp          1/1     Running   0          29m

The sample program that will run on the cluster was introduced in our previous post, it consists of 22 benchmark queries. Therefore, node reclaiming events and up/downscaling will be avoided — all four cluster nodes are provisioned as on-demand instances and, later on, neither dynamic allocation nor autoscaling will be activated. For workloads with less stringent requirements, the use of dynamic scaling and a few spot instances that can host Spark executors tends to be much more cost-effective.

3.3 Mounting NVME Disks

Both the m6gd and m5d families support instance store volumes, namely local NVMe SSDs. These devices are very good storage backends for the temporary files that Spark creates due to their high I/O performance. In our previous benchmarks, we have seen significant run time differences between different AWS storage options for shuffle-intensive queries like TPC-H Q18. NVMe-based SSDs are not automatically set up during an EKS cluster launch which is an important difference to EMR on Ec2 . For this reason, our cluster manifests contain preBootstrapCommands sections that perform the formatting and disk mounts. After the bootstrapping phase of an EKS cluster has completed, the disk setup can be checked by connecting to one of the nodes with an instance store volume. This requires two changes in the exec-group section of the cluster definition file:

  • The value for the privateNetworking field should be set to false so the associated instances receive a public IPv4 DNS.
  • An ssh entry similar to the following one should be inserted:

       ssh:
         allow: true
            publicKeyName: MyKeyName

After adding an ssh inbound rule to the security group of the target node, we can ssh into it and confirm the volume mounts:


[ec2-user@ip-192-xxx-xxx-87 ~]$ sudo nvme list
Node SN Model Namespace Usage Format FW Rev 
---------------- -------------------- ---------------------------------------- --------- -------------------------- ---------------- --------
/dev/nvme0n1 vol068d381d5e4ac15b8 Amazon Elastic Block Store 1 21.47 GB / 21.47 GB 512 B + 0 B 1.0 
/dev/nvme1n1 AWS6A06EB0611850CECB Amazon EC2 NVMe Instance Storage 1 474.00 GB / 474.00 GB 512 B + 0 B 0

[ec2-user@ip-192-xxx-xxx-87 ~]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme0n1 259:0 0 15G 0 disk 
├─nvme0n1p1 259:2 0 15G 0 part /
└─nvme0n1p128 259:3 0 10M 0 part /boot/efi
nvme1n1 259:1 0 441.5G 0 disk /local

According to the terminal output, two devices are attached to the instance: The root EBS volume nvme0n1 of size 15 G has two partitions whose mount points are / and /boot/efi. The instance store volume nvme1n1 is mounted at the directory /local. In sections 4 and 5, Spark applications get configured to write shuffle files to a directory under /local so the fast NVMe SSDs are used for the scratch space.

3.4 Access Management

If the demo application was submitted to the Kubernetes cluster at this point, failures would occur as pods have not been granted permissions yet to perform actions on external resources like S3. To avoid this, an IAM policy with these S3 and logging permissions will be defined.

eks-test-policy.json


{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts",
        "s3:ListBucketMultipartUploads"
      ],
      "Resource": [
        "arn:aws:s3:::$S3_TEST_BUCKET",
        "arn:aws:s3:::$S3_TEST_BUCKET/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups",
        "logs:DescribeLogStreams",
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:*:*:*"
      ]
    }
  ]
}

A few environment variables should be declared since subsequent commands make multiple references to account details that change from user to user: S3_TEST_BUCKET holds the name of the S3 bucket under which the application input and results are stored, the account id is retrieved with a command and assigned to the variable ACCOUNT_ID:


$ export S3_TEST_BUCKET=xonai
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

An IAM policy named eks-test-policy can now be defined easily:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-test-policy.json | envsubst > eks-test-policy.json
$ aws iam create-policy --policy-name eks-test-policy --policy-document file://eks-test-policy.json

Pods do not directly use an IAM policy, the association happens through a Kubernetes service account which is created as follows:


$ eksctl create iamserviceaccount --name oss-sa --namespace oss --cluster eks-test-cluster --attach-policy-arn arn:aws:iam::$ACCOUNT_ID:policy/eks-test-policy --approve --override-existing-serviceaccounts

This single <span class="monospace">eksctl</span> command accomplishes multiple tasks:

  1. A namespace oss is created.
  2. An IAM role and service account pair is created.
  3. The IAM role is mapped to the service account oss/oss-sa.
  4. The policy eks-test-policy is attached to the newly created role.

In section 4.1, the job manifest configures the Spark pods to use the oss/oss-sa service account so they can enjoy the permissions declared in eks-test-policy. The policy has to be created only once but the <span class="monospace>"create iamserviceaccount</span> command needs to be repeated whenever a new EKS cluster is provisioned.

3.5 The Kubernetes Dashboard

A useful tool in the Kubernetes ecosystem is the Dashboard, a web-based UI that visualizes a cluster and its K8s objects. Unfortunately, it is not pre-installed on EKS clusters, so a number of steps are required before it can be accessed. The Dashboard can be deployed as follows:


$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.6.1/aio/deploy/recommended.yaml

Accessing the UI is much more involved than installing it, the most straightforward authentication strategy consists in using a bearer token: First, the kubectl proxy is started:


$ kubectl proxy
Starting to serve on 127.0.0.1:8001

Copy-pasting the below URL into a browser window opens the Dashboard login view, the port number (8001 here) may have to be adjusted depending on the proxy output:


http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy

To pass through the login page, an authentication token needs to be retrieved from a new terminal window:


$ aws eks get-token --cluster-name eks-test-cluster | jq -r '.status.token'

After pasting the returned string into the Enter Token field of the login page, the UI should appear:

No applications have been submitted yet but the UI is still populated: After the All namespaces option is selected from the drop-down menu near the top-left corner, the Workloads tab displays a few active Kubernetes objects that provide core cluster and Dashboard functionality. When the first benchmark workload is deployed in the next section, a new Jobs entry along with multiple Spark pods will be displayed:

4. Using OSS SPARK on EKS

After following the instructions from the last section, an EKS cluster that is configured to access AWS resources will be running. The custom oss Spark image that was created at the beginning of this article can now be used in the first workload.

4.1 The Kubernetes Job Configuration

For our previous benchmark article, we executed TPC-H queries via spark-submit on EC2 and on EMR. The same program can be run on an EKS cluster by integrating it with a Kubernetes Job. Our oss-job.yaml manifest follows this approach, it instantiates a container from the spark_3.1.2_tpch image and invokes spark-submit in it. The most important segments of this job configuration are described in more detail below.

oss-job.yaml


apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-role
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: edit
subjects:
  - kind: ServiceAccount
    name: oss-sa
    namespace: oss
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: template-map
  namespace: oss
data:
  driver: |-
    apiVersion: v1
    kind: Pod
    spec:
      nodeSelector:
        noderole: driver
      initContainers:
      - name: volume-permissions
        image: public.ecr.aws/docker/library/busybox
        command: ['sh', '-c', 'mkdir /local/scratch; chown -R 185 /local/scratch']
        volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

  executor: |-
    apiVersion: v1
    kind: Pod
    spec:
      nodeSelector:
        noderole: executor
      initContainers:
      - name: volume-permissions
        image: public.ecr.aws/docker/library/busybox
        command: ['sh', '-c', 'mkdir /local/scratch; chown -R 185 /local/scratch']
        volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1
---
apiVersion: batch/v1
kind: Job
metadata:
  name: oss-tpch-job
  namespace: oss
spec:
  template:
    spec:
      containers:
        - name: job-container
          image: $OSS_IMAGE_URI
          args: [
            "/bin/sh",
            "-c",
            "/opt/spark/bin/spark-submit \
            --master k8s://https://kubernetes.default.svc.cluster.local:443 \
            --deploy-mode cluster \
            --name oss-tpch-spark \
            --class com.xonai.RunMainTPC \
            --conf spark.dynamicAllocation.enabled=false \
            --conf spark.driver.memory=4G \
            --conf spark.executor.memory=10G \
            --conf spark.executor.cores=3 \
            --conf spark.executor.instances=4 \
            --conf spark.sql.shuffle.partitions=250 \
            --conf spark.kubernetes.container.image=$OSS_IMAGE_URI \
            --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
            --conf spark.kubernetes.container.image.pullPolicy=Always \
            --conf spark.kubernetes.authenticate.driver.serviceAccountName=oss-sa \
            --conf spark.kubernetes.namespace=oss \
            --conf spark.kubernetes.driver.label.spark/component=driver \
            --conf spark.kubernetes.executor.label.spark/component=executor \
            --conf spark.kubernetes.driver.podTemplateFile='/opt/spark/conf/driver_pod_template.yaml' \
            --conf spark.kubernetes.executor.podTemplateFile='/opt/spark/conf/executor_pod_template.yaml' \
            --conf spark.kubernetes.driver.volumes.hostPath.spark-local-dir-1.mount.path='/local/scratch' \
            --conf spark.kubernetes.driver.volumes.hostPath.spark-local-dir-1.options.path='/local/scratch' \
            --conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.mount.path='/local/scratch' \
            --conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.options.path='/local/scratch' \
            --conf spark.local.dir='/local/scratch' \
            --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
            --conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
            --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
            s3a://$S3_TEST_BUCKET/cloudbench-assembly-1.0.jar \
            \"s3a://$S3_TEST_BUCKET/tpch_input\" \
            \"s3a://$S3_TEST_BUCKET/tpch_result\" \
            \"/opt/tpch-dbgen\" \
            \"100\"
            \"1\""
          ]
          volumeMounts:
            - name: template-volume
              mountPath: /opt/spark/conf/driver_pod_template.yaml
              subPath: driver
            - name: template-volume
              mountPath: /opt/spark/conf/executor_pod_template.yaml
              subPath: executor
      serviceAccountName: oss-sa
      restartPolicy: Never
      volumes:
        - name: template-volume
          configMap:
            name: template-map
            defaultMode: 420
  backoffLimit: 4

The main program that gets called by spark-submit in line 66 is RunMainTPC.scala. It is packaged into the JAR cloudbench-assembly-1.0.jar, the steps required for creating this file are described on our Github repository. <span class="monospace">RunMainTPC</span> expects five arguments which are specified right after the JAR file on lines 91 to 95 of the Job definition.

Not all relevant pod settings can be directly defined with custom flags in the spark-submit command. Therefore, the manifest uses the pod template mechanism for accomplishing two important configuration tasks:

  • Node selectors are declared which constrain the set of worker nodes to which driver and executor pods can be assigned.  For example, the driver pod can only be scheduled on a m6gd.large instance (or m5d.large for x86) as no other cluster node was tagged with the label noderole: driver .  
  • Sidecar containers are added, they configure the volume access so Spark can write scratch data under the mount point directory of the fast NVMe volume.

The template content is stored as key-value pairs in the data field of the ConfigMap eks-configmap in line 19. This map gets mounted as a volume into the job container: The volumes section of the Job spec (line 106) references eks-configmap and copies its contents into the driver and executor template files whose paths are specified in the volumeMounts field. Spark pods can then read the configuration data from the file mounts.

It is important that the NVMe devices that are attached to the m6gd.2xlarge (or m5d.2xlarge) instances are used for the Spark scratch space. This can be validated by connecting to a node that hosts executors (see 3.3) and checking its disk space while a query with a larger shuffle  is executed:


[ec2-user@ip-192-xxx-xxx-89 ~]$ df -H
Filesystem        Size  Used Avail Use% Mounted on
devtmpfs           17G     0   17G   0% /dev
tmpfs              17G     0   17G   0% /dev/shm
tmpfs              17G  1.1M   17G   1% /run
tmpfs              17G     0   17G   0% /sys/fs/cgroup
/dev/nvme0n1p1     17G  3.8G   13G  24% /
/dev/nvme0n1p128   11M  4.0M  6.6M  38% /boot/efi
/dev/nvme1n1      474G   19G  459G   5% /local
tmpfs             3.3G     0  3.3G   0% /run/user/1000

In oss-job.yaml, the application property spark.local.dir is set to /local/scratch in line 86 so temporary data (e.g., shuffle files) are written to this directory. According to the terminal output, the mount point directory /local is associated with the NMVe volume which held around 19G of Spark scratch data when the df command was entered. This number is higher than the total root volume size and confirms the correctness of the disk setup.

4.2 Running the Kubernetes Job

The batch job described in oss-job.yaml can be deployed as follows:


$ export S3_TEST_BUCKET=xonai
$ export AWS_REGION=us-east-1
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ export OSS_IMAGE_URI=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/spark_3.1.2_tpch:latest
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/oss-job.yaml | envsubst > oss-job.yaml
# Modify the program arguments in lines 91 to 95 if required
$ kubectl apply -f oss-job.yaml

After the last command is entered, a job pod launches almost immediately. Its container invokes the spark-submit process and a Spark driver pod is scheduled. The driver contacts the K8s API server with a request for additional executor pods. After one or two minutes, six pods in total should be up and running in the oss namespace:


$ kubectl get jobs -n oss
NAME           COMPLETIONS   DURATION   AGE
oss-tpch-job   0/1           90s        90s

$ kubectl get pods -n oss
NAME                                                 READY   STATUS    RESTARTS   AGE
oss-tpch-job-dbvzk                                   1/1     Running   0          94s
oss-tpch-spark-5d3d7f83412a3f5c-driver               1/1     Running   0          74s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-1   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-2   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-3   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-4   1/1     Running   0          53s

The names in the terminal output above indicate that four executor pods were launched which corresponds to the <span class="monospace">spark.executor.instances=4</span> setting in the Job spec as one Spark executor runs in one container/pod.

The Spark web interface can be accessed after forwarding the local port 4040 to the same port on the driver pod:


$ kubectl port-forward oss-tpch-spark-5d3d7f83412a3f5c-driver 4040:4040 -n oss
Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 -> 4040

The UI can now be viewed at http://localhost:4040/:

For our demo run, a TPC-H scale factor of 100 was used, so the executors need to process 100 GB of input data. This takes around 20 minutes:


$ kubectl get pods -n oss
NAME                                      READY   STATUS      RESTARTS   AGE
oss-tpch-job-dbvzk                        0/1     Completed   0          22m
oss-tpch-spark-5d3d7f83412a3f5c-driver    0/1     Completed   0          22m

After the Spark application has succeeded, the Job object and the driver pod linger on in a completed state. Therefore, the driver log can still be inspected from the command-line:


$ kubectl logs oss-tpch-spark-5d3d7f83412a3f5c-driver -n oss
[...]
   running Thread[benchmark runner,5,main]
   Execution time: 18.987546802s
   Results written to table: 'sqlPerformance' at s3a://xonai/result/timestamp=1663172601987

During the final stage, the application writes a JSON file to S3 that contains execution information like query run times. The Job object needs to be deleted before the application can be rerun:


$ kubectl delete job oss-tpch-job -n oss
job.batch "oss-tpch-job" deleted
$ kubectl delete pod oss-tpch-spark-5d3d7f83412a3f5c-driver -n oss
pod "oss-tpch-spark-5d3d7f83412a3f5c-driver" deleted

5. Using EMR on EKS

Before an EMR job can be submitted to a Kubernetes cluster, a few objects and relationships that facilitate the interplay between the EKS and EMR services need to be created. The next section walks through the necessary setup steps, the last part describes how an EMR workload can be deployed.

5.1 Setting up a virtual cluster

The first step in preparing the existing cluster for EMR on EKS consists in creating a new IAM role that will be used with the virtual cluster:


$ aws iam create-role --role-name emr-test-role --assume-role-policy-document '{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "eks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}'

The policy that was created at the beginning of 3.4 can be used by more than one IAM identity. After eks-test-policy is attached to the new execution role, it will enjoy the same permissions as the role for the service account oss/oss-sa that was used by the first workload:


$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ aws iam attach-role-policy --role-name emr-test-role --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/eks-test-policy

In 4.2, the Kubernetes Job was deployed under the oss namespace, so another dedicated namespace should be created now. When EMR is registered with it, a virtual cluster is launched.


$ kubectl create namespace emr

EMR processes still lack permissions to use the new namespace. This can be rectified with an identity mapping command:


$ export AWS_REGION=us-east-1 
$ eksctl create iamidentitymapping --region $AWS_REGION --cluster eks-test-cluster --namespace emr --service-name "emr-containers"

In the penultimate step, a trust relationship between emr-test-role and EMR service accounts is created, so the latter can use the role credentials. It is not necessary to manually set up an EMR managed service account, this is done automatically upon job run submission.


$ aws emr-containers update-role-trust-policy \
   --cluster-name eks-test-cluster  \
   --namespace emr \
   --role-name emr-test-role

Finally, the virtual EMR cluster is launched:


$ aws emr-containers create-virtual-cluster --name emr-test-cluster \
    --container-provider '{
        "id": "'eks-test-cluster'",
        "type": "EKS",
        "info": { "eksInfo": { "namespace": "'emr'" } }
    }'

In the EMR console, the Virtual clusters section at the bottom left should now list a new active cluster:

The EMR console does not (yet) provide a lot of functionality when it comes to orchestrating virtual clusters and job runs, most actions need to originate from the command-line. For example, the following command that will be used in the next section retrieves the details of the virtual cluster:


$ aws emr-containers list-virtual-clusters --query "virtualClusters[?name == 'emr-test-cluster' && state == 'RUNNING']"
[
    {
        "id": "2jf0w2hwzpia1f6yb5hyog9oa",
        "name": "emr-test-cluster",
        "arn": "arn:aws:emr-containers:us-east-1:000000000000:/virtualclusters/2jf0w2hwzpia1f6yb5hyog9oa",
        "state": "RUNNING",
        "containerProvider": {
            "type": "EKS",
            "id": "eks-test-cluster",
            "info": {
                "eksInfo": {
                    "namespace": "emr"
                }
            }
        },
        "createdAt": "2022-09-23T15:45:05+00:00",
        "tags": {}
    }
]

5.2 Scheduling A Job Run

With the EMR on EKS setup described above, it is more convenient to place the pod template contents in files on S3 than to populate and mount a ConfigMap. The two manifests driver_template.yaml and exec_template.yaml contain similar pod configuration data as eks-configmap in the K8s Job specification:

driver_template.yaml


apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local/scratch
  nodeSelector:
    noderole: driver
  containers:
    - name: spark-kubernetes-driver
      volumeMounts:
        - name: spark-local-dir-1
          mountPath: /local/scratch
  initContainers:
    - name: volume-permissions
      image: public.ecr.aws/docker/library/busybox
      command: [ 'sh', '-c', 'mkdir /local/scratch; chown -R 999:1000 /local/scratch' ]
      volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

exec_template.yaml


apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local/scratch
  nodeSelector:
    noderole: executor
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
        - name: spark-local-dir-1
          mountPath: /local/scratch
  initContainers:
    - name: volume-permissions
      image: public.ecr.aws/docker/library/busybox
      command: [ 'sh', '-c', 'mkdir /local/scratch; chown -R 999:1000 /local/scratch' ]
      volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

Both template files should be copied to S3:


$ export S3_TEST_BUCKET=xonai
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/driver_pod_template.yaml > driver_pod_template.yaml
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/exec_pod_template.yaml > exec_pod_template.yaml
$ aws s3 cp driver_pod_template.yaml s3://$S3_TEST_BUCKET/
$ aws s3 cp exec_pod_template.yaml s3://$S3_TEST_BUCKET/ 

Our benchmark program can now be submitted to the virtual cluster. The emr-job.json file contains the EMR equivalent to the Kubernetes Job definition from 4.1 . While the latter ultimately relies on open source Spark, the job run initiated below uses EMR release 6.5.0 and the custom image that embeds the EMR runtime:

emr-job.json


{
  "virtualClusterId": "$VIRTUAL_ID",
  "name": "emr-tpch-job",
  "executionRoleArn": "arn:aws:iam::$ACCOUNT_ID:role/emr-test-role",
  "releaseLabel": "emr-6.5.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://$S3_TEST_BUCKET/cloudbench-assembly-1.0.jar",
      "entryPointArguments": [
        "s3://$S3_TEST_BUCKET/tpch_input",
        "s3://$S3_TEST_BUCKET/tpch_result",
        "/opt/tpch-dbgen",
        "100",
        "1"
      ],
      "sparkSubmitParameters": "--name emr-tpch-spark --class com.xonai.RunMainTPC --conf spark.driver.memory=4G --conf spark.executor.cores=3 --conf spark.executor.memory=10G --conf spark.executor.instances=4 --conf spark.sql.shuffle.partitions=250"
    }
  },
  "configurationOverrides": {
    "applicationConfiguration": [
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.kubernetes.container.image": "$EMR_IMAGE_URI",
          "spark.kubernetes.driver.podTemplateFile": "s3://$S3_TEST_BUCKET/driver_pod_template.yaml",
          "spark.kubernetes.executor.podTemplateFile": "s3://$S3_TEST_BUCKET/exec_pod_template.yaml",
          "spark.dynamicAllocation.enabled": "false",
          "spark.local.dir": "/local/scratch"
        }
      }
    ],
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {
        "logUri": "s3://$S3_TEST_BUCKET/elasticmapreduce/emr-containers"
      }
    }
  }
}

An EMR job run can be started as follows:


$ export S3_TEST_BUCKET=xonai
$ export AWS_REGION=us-east-1 
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ export VIRTUAL_ID=$(aws emr-containers list-virtual-clusters --query "virtualClusters[?name == 'emr-test-cluster' && state == 'RUNNING'].id" --output text)
$ export EMR_IMAGE_URI=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/emr_6.5_tpch:latest

$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/emr-job.json | envsubst > emr-job.json
# Modify the program arguments in lines 10 to 14 if required
$ aws emr-containers start-job-run --cli-input-json file://./emr-job.json

Similar to the execution of the oss Spark job, six pods in total get scheduled after initiating the job run:


$ kubectl get pods -n emr
NAME                                                 READY   STATUS    RESTARTS   AGE
000000030rb1lh9ikmj-cf268                            3/3     Running   0          2m47s
spark-000000030rb1lh9ikmj-driver                     2/2     Running   0          2m35s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-1   2/2     Running   0          2m3s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-2   2/2     Running   0          2m3s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-3   2/2     Running   0          2m2s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-4   2/2     Running   0          2m2s

Clicking on the emr-test-cluster link in the EMR console’s Virtual Clusters window opens a job run view. The recently launched job should be listed in a running state:

After the application completes, all associated pods get deleted almost immediately which is a difference to the oss counterpart in 4.2. The final status of the job run is displayed on the EMR console and can be fetched from the command-line:


$ kubectl get pods -n emr
No resources found in emr namespace.
$ aws emr-containers list-job-runs --virtual-cluster-id $VIRTUAL_ID
{
    "jobRuns": [
        {
            "id": "000000030rb1lh9ikmj",
            "name": "emr-tpch-job",
            "virtualClusterId": "2jf0w2hwzpia1f6yb5hyog9oa",
            "arn": "arn:aws:emr-containers:us-east-1:000000000000:/virtualclusters/2jf0w2hwzpia1f6yb5hyog9oa/jobruns/000000030rb1lh9ikmj",
            "state": "COMPLETED",
            "executionRoleArn": "arn:aws:iam::000000000000:role/emr-test-role",
            "releaseLabel": "emr-6.5.0-latest",
            "createdAt": "2022-09-23T15:53:02+00:00",
            "finishedAt": "2022-09-23T16:12:19+00:00",
            "stateDetails": "JobRun completed successfully. It ran for 18 Minutes 45 Seconds",
            "tags": {}
        },
[...]

5.3 Cleaning up

After all applications have completed, the virtual cluster and underlying EKS cluster should be deleted:


$ aws emr-containers delete-virtual-cluster --id $VIRTUAL_ID
$ eksctl delete cluster -n eks-test-cluster

For the deletion of a virtual cluster, operations on system resources are not required, so the first command completes almost instantly. The termination of the physical EKS cluster involves the deprovisioning of nodes and the deletion of CloudFormation stacks which can take a few minutes.

1. Introduction

Apache Spark on Kubernetes is generally available since release 3.1.0 and is quickly gaining popularity as an alternative to YARN clusters, enabling efficient resource sharing, multi cloud deployments and access to a rich open-source ecosystem of tools ranging from monitoring and management to end-to-end data and machine learning pipelines. On Amazon EMR on EKS, Spark jobs can be deployed on Kubernetes clusters, sharing compute resources across applications with a single set of Kubernetes tools centralizing monitoring and management of infrastructure.

Unfortunately, it is not straightforward to deploy Spark workloads to a Kubernetes cluster that is mostly comprised of Graviton-based instance types: The Spark operator does not seem to be compatible with architectures other than x86 and we encountered problems when using the manifests from official AWS guides. This article walks through an end-to-end example of running a demo application on an EKS cluster with Graviton or x86-based instances and on an EMR virtual cluster: The next section describes how customized Docker images with open-source and EMR runtimes can be created and published to ECR. Sample configuration steps for provisioning an EKS cluster and running an oss Spark job on it are covered in section 3. In the last part of this post, EMR on EKS is enabled and a job run on a virtual cluster is scheduled.

2. Customizing Docker Images

Kubernetes is a container orchestrator so applications need to be containerized before they can be submitted to an EKS cluster. On AWS, container images are distributed with the Elastic Container Registry service. The following subsections show how to build and publish custom Docker images with a Spark distribution and an EMR runtime. In the second half of this article, these images are deployed to an EKS cluster.

2.1 Prerequisites 

Container images are stored in repositories of a registry. For the two custom Docker images that will be used later, two private ECR repositories with the names spark_3.1.2_tpch and emr_6.5_tpch should be created:

We build the images on EC2. The choice of the instance type depends on the processor architecture of the EKS cluster nodes that will host Spark pods: When ARM-based nodes will be provisioned, the m6g.xlarge instance type is more than sufficient for creating the custom images. For nodes with x86 architecture CPUs, an m5.xlarge instance should be launched. Amazon Linux 2 needs to be selected as the machine image and the root volume size should be set to around 20 GiB. When the instance is up and running, we log onto it and install a few dependencies: 


$ sudo yum install docker
$ sudo service docker start
$ sudo amazon-linux-extras enable corretto8
$ sudo yum install java-1.8.0-amazon-corretto-devel
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0-amazon-corretto

It is advisable to set a few environment variables, this shortens subsequent commands for pushing and pulling images:


$ export AWS_REGION=us-east-1
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

The format of a repository URL is determined by its visibility:

  • URLs for private ECR repositories follow the format account-id.dkr.ecr.region.amazonaws.com
  • Public repo URLs start with public.ecr.aws/ followed by an alias 

A variable that stores a private repo URL is defined as follows: 


$ export ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com

2.2 Building a Custom OSS Spark Image

The Apache Spark codebase already contains a Dockerfile that is suitable for creating a base image with a binary distribution from which more specialized images can be derived. As first step, the Spark source code needs to be downloaded and extracted:


$ wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2.tgz
$ tar -xvzf spark-3.1.2.tgz

The AWS EKS guide suggests to build Spark 3.1.2 with a member of the Hadoop 3.3 release line so improvements for Kubernetes and S3 can take effect. One entry of the root POM file has to be modified for this purpose:


$ sed -i 's|<hadoop.version>3.2.0</hadoop.version>|<hadoop.version>3.3.1</hadoop.version>|' spark-3.1.2/pom.xml

A runnable Spark distribution is then composed with the help of the make-distribution script:


$ ll
total 24164
drwxr-xr-x 30 ec2-user ec2-user     4096 May 24  2021 spark-3.1.2
-rw-rw-r--  1 ec2-user ec2-user 24738927 May 24  2021 spark-3.1.2.tgz
  
$ ./spark-3.1.2/dev/make-distribution.sh --name spark-oss --pip --tgz -Phadoop-cloud -Phadoop-3.2 -Pkubernetes -DskipTests

Now the base image can be created:


$ cd spark-3.1.2/dist
$ sudo docker build -f kubernetes/dockerfiles/spark/Dockerfile -t spark_3.1.2 --build-arg java_image_tag=8-jre-slim .
$ sudo docker image ls
REPOSITORY   TAG          IMAGE ID       CREATED              SIZE
spark_3.1.2  latest       19ca9b6276c9   About a minute ago   679MB
openjdk      8-jre-slim   85b121affedd   7 weeks ago          194MB

Since the sample program that is executed later on needs to access the TPC-H kit, a layer with this library should be added to the base image. Our Dockerfile contains all the instructions required for assembling a TPC-H image:


ARG BASE_IMAGE=755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0:latest

FROM amazonlinux:2 as tpc-toolkit

RUN yum group install -y "Development Tools" && \
    git clone https://github.com/databricks/tpch-dbgen.git /tmp/tpch-dbgen && \
    cd /tmp/tpch-dbgen && \
    git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c  && \
    make OS=LINUX

FROM ${BASE_IMAGE}

COPY --from=tpc-toolkit /tmp/tpch-dbgen /opt/tpch-dbgen

The TPC-H image is derived as follows:


$ cd ~
$ wget https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/Dockerfile
$ sudo docker build -t $ECR_URL/spark_3.1.2_tpch -f ./Dockerfile --build-arg BASE_IMAGE=spark_3.1.2 .
$ sudo docker image ls
REPOSITORY                                                      TAG          IMAGE ID       CREATED          SIZE
[...]
spark_3.1.2                                                     latest       19ca9b6276c9   3 minutes ago    679MB
000000000000.dkr.ecr.us-east-1.amazonaws.com/spark_3.1.2_tpch   latest       5e4f08f28cb5   38 seconds ago   682MB

Before the new image can be pushed to the spark_3.1.2_tpch repository, Docker must be authenticated to the registry:


$ aws ecr get-login-password --region $AWS_REGION | sudo docker login --username AWS --password-stdin $ECR_URL

After the authentication succeeds, the image is published:


$ sudo docker push $ECR_URL/spark_3.1.2_tpch

2.3 Building a custom EMR Image

The spark_3.1.2_tpch image is not compatible with EMR on EKS as it encapsulates open source Spark binaries. Therefore, a similar package that embeds an EMR runtime needs to be created before the demo application can be used in a job run. EMR images are publicly available, the different ECR registry accounts are listed on this page.  For the us-east-1 region, the corresponding repository URL is 755674844232.dkr.ecr.us-east-1.amazonaws.com:


$ export ECR_PUBLIC=755674844232.dkr.ecr.us-east-1.amazonaws.com

An authentication step is required before accessing any official AWS image:


$ aws ecr get-login-password --region $AWS_REGION | sudo docker login --username AWS --password-stdin $ECR_PUBLIC

The EMR equivalent to the spark_3.1.2_tpch image can now be assembled. Because a 3.1.2 distribution was compiled in the previous section, a base image with an emr-6.5.0 tag should be downloaded since the same Spark version is supported by this EMR release:


$ sudo docker pull $ECR_PUBLIC/spark/emr-6.5.0:latest
$ sudo docker image ls
REPOSITORY                                                     TAG          IMAGE ID       CREATED         SIZE
[...]
755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0   latest       77290d3aac6a   6 weeks ago     1.47GB

The Dockerfile that adds the TPC-H kit to the container filesystem is reused, but this time an EMR URI is passed as its base image argument:


$ ls
Dockerfile  spark-3.1.2  spark-3.1.2.tgz
$ sudo docker build -t $ECR_URL/emr_6.5_tpch -f ./Dockerfile --build-arg BASE_IMAGE=$ECR_PUBLIC/spark/emr-6.5.0:latest .
$ sudo docker image ls
REPOSITORY                                                     TAG          IMAGE ID       CREATED              SIZE
[...]
755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0   latest       77290d3aac6a   6 weeks ago          1.47GB
402730444140.dkr.ecr.us-east-1.amazonaws.com/emr_6.5_tpch      latest       779050788b59   About a minute ago   1.47GB

Finally, the derived EMR-TPCH image is uploaded to a dedicated ECR repository:


$ sudo docker push $ECR_URL/emr_6.5_tpch

After the images have been published, the EC2 instance that executed the build instructions is no longer needed and should be terminated.

3. Creating an EKS Cluster

The following paragraphs explain how a Kubernetes cluster can be deployed with EKS. In section 4, a Spark application is submitted to this cluster and in the last part of this article, EMR is registered with it and a job run is initiated.

3.1 Prerequisites

EKS clusters can be launched from local development environments but the command-line tools <span class="monospace">kubectl</span> and <span class="monospace">eksctl</span> need to be installed, more information is provided on this page. The architecture and basic concepts of Kubernetes are described here.

3.2 Cluster Configuration

Our eks-cluster-arm.yaml and eks-cluster-x86.yaml manifests contain the configuration details for a basic EKS cluster.

eks-cluster-arm.yaml


---
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: eks-test-cluster
  region: us-east-1 # ToDo: Modify
  version: "1.23"
availabilityZones: [ "us-east-1a", "us-east-1d" ] # ToDo: Modify
vpc:
  nat:
    gateway: HighlyAvailable
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
cloudWatch: # Control Plane logs
  clusterLogging:
    enableTypes: [ "*" ]
iam:
  withOIDC: true
  serviceAccounts:
    - metadata:
        name: cluster-autoscaler
        namespace: kube-system
        labels: { aws-usage: "cluster-ops" }
      wellKnownPolicies:
        autoScaler: true
      roleName: eks-test-autoscaler-role
managedNodeGroups:
  - name: tooling
    instanceType: t3.large
    desiredCapacity: 1
    volumeSize: 20
    labels:
      noderole: tooling
    tags:
      k8s.io/cluster-autoscaler/node-template/label/noderole: tooling
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
nodeGroups:
  - name: driver-group
    availabilityZones: [ "us-east-1d" ] # ToDo: Modify
    desiredCapacity: 1
    volumeSize: 15
    privateNetworking: true
    instanceType: "m6gd.large"
    labels:
      arch: arm64
      noderole: driver
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned"
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV}; mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a
  - name: exec-group
    availabilityZones: [ "us-east-1d" ] # ToDo: Modify
    desiredCapacity: 2
    volumeSize: 15
    privateNetworking: true
    instanceType: "m6gd.2xlarge"
    labels:
      arch: arm64
      noderole: executor
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned"
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV}; mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a


If the application images support ARM architectures, the file with the matching arm infix needs to be downloaded as it specifies worker nodes with this CPU architecture:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-cluster-arm.yaml > eks-test-cluster.yaml

The manifest eks-cluster-x86.yaml should be used for the alternative x86 architecture option:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-cluster-x86.yaml > eks-test-cluster.yaml

A dedicated VPC will be created for the cluster, so only a few lines that reference availability zones and are marked with ToDo comments may have to be modified. The test cluster can be spawned with the <span class="monospace">eksctl</span> command-line tool:


# Make modifications to eks-test-cluster.yaml if required
$ eksctl create cluster -f eks-test-cluster.yaml 

As part of the cluster creation process that is started by the last command, several infrastructure resources are provisioned and configured via CloudFormation. These operations tend to be time consuming and can take more than half an hour to complete. 

If the cluster launch is successful, the final log message will contain the following statement:


[✔]  EKS cluster "eks-test-cluster" in "us-east-1" region is ready

In case errors appear on the terminal, it might be necessary to manually delete cluster-related stacks in the CloudFormation console and re-execute the <span class="monospace">eksctl</span> command.

Eventually, an EKS cluster that consists of four EC2 instances will be up and running. The local kubeconfig file should have been updated with an entry for eks-test-cluster, in which case the <span class="monospace">kubectl</span>/<span class="monospace">eksctl</span> CLIs point to the new cluster. If the commands used below return nothing or information of a different cluster, the kubeconfig documentation should be consulted.


$ kubectl get nodes
NAME                             STATUS   ROLES    AGE   VERSION
ip-192-xxx-xxx-64.ec2.internal   Ready    <none>   18m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-87.ec2.internal   Ready    <none>   18m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-19.ec2.internal   Ready    <none>   23m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-17.ec2.internal   Ready    <none>   20m   v1.23.9-eks-ba74326

The four instances listed in the terminal output above are grouped into three node pools:


$ eksctl get nodegroup --cluster=eks-test-cluster
CLUSTER        NODEGROUP       STATUS             DES. CAP. INSTANCE TYPE  TYPE
eks-test-cluster tooling       ACTIVE             1         t3.large       managed
eks-test-cluster driver-group  CREATE_COMPLETE    1         m6gd.large     unmanaged
eks-test-cluster exec-group    CREATE_COMPLETE    2         m6gd.2xlarge   unmanaged

The control plane and other processes that provide core K8s functionality run on a low cost t3.large instance in the tooling nodegroup. The Spark driver pod will be hosted on a dedicated m6gd.large (or m5d.large for x86) instance. Executor pods can be assigned to two m6gd.2xlarge (or m5d.2xlarge) workers which comprise the exec-group nodegroup. No Spark pods have been launched yet but this will change during the next two sections:


$ kubectl get pods -A
NAMESPACE     NAME                      READY   STATUS    RESTARTS   AGE
kube-system   aws-node-6npz4            1/1     Running   0          27m
kube-system   aws-node-m4xfw            1/1     Running   0          27m
kube-system   aws-node-w2pzq            1/1     Running   0          29m
kube-system   aws-node-wfsxl            1/1     Running   0          32m
kube-system   coredns-7f5998f4c-42m7r   1/1     Running   0          42m
kube-system   coredns-7f5998f4c-hj44f   1/1     Running   0          42m
kube-system   kube-proxy-frtf8          1/1     Running   0          27m
kube-system   kube-proxy-hfmdz          1/1     Running   0          32m
kube-system   kube-proxy-jg7g2          1/1     Running   0          27m
kube-system   kube-proxy-nn4qp          1/1     Running   0          29m

The sample program that will run on the cluster was introduced in our previous post, it consists of 22 benchmark queries. Therefore, node reclaiming events and up/downscaling will be avoided — all four cluster nodes are provisioned as on-demand instances and, later on, neither dynamic allocation nor autoscaling will be activated. For workloads with less stringent requirements, the use of dynamic scaling and a few spot instances that can host Spark executors tends to be much more cost-effective.

3.3 Mounting NVME Disks

Both the m6gd and m5d families support instance store volumes, namely local NVMe SSDs. These devices are very good storage backends for the temporary files that Spark creates due to their high I/O performance. In our previous benchmarks, we have seen significant run time differences between different AWS storage options for shuffle-intensive queries like TPC-H Q18. NVMe-based SSDs are not automatically set up during an EKS cluster launch which is an important difference to EMR on Ec2 . For this reason, our cluster manifests contain preBootstrapCommands sections that perform the formatting and disk mounts. After the bootstrapping phase of an EKS cluster has completed, the disk setup can be checked by connecting to one of the nodes with an instance store volume. This requires two changes in the exec-group section of the cluster definition file:

  • The value for the privateNetworking field should be set to false so the associated instances receive a public IPv4 DNS.
  • An ssh entry similar to the following one should be inserted:

       ssh:
         allow: true
            publicKeyName: MyKeyName

After adding an ssh inbound rule to the security group of the target node, we can ssh into it and confirm the volume mounts:


[ec2-user@ip-192-xxx-xxx-87 ~]$ sudo nvme list
Node SN Model Namespace Usage Format FW Rev 
---------------- -------------------- ---------------------------------------- --------- -------------------------- ---------------- --------
/dev/nvme0n1 vol068d381d5e4ac15b8 Amazon Elastic Block Store 1 21.47 GB / 21.47 GB 512 B + 0 B 1.0 
/dev/nvme1n1 AWS6A06EB0611850CECB Amazon EC2 NVMe Instance Storage 1 474.00 GB / 474.00 GB 512 B + 0 B 0

[ec2-user@ip-192-xxx-xxx-87 ~]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme0n1 259:0 0 15G 0 disk 
├─nvme0n1p1 259:2 0 15G 0 part /
└─nvme0n1p128 259:3 0 10M 0 part /boot/efi
nvme1n1 259:1 0 441.5G 0 disk /local

According to the terminal output, two devices are attached to the instance: The root EBS volume nvme0n1 of size 15 G has two partitions whose mount points are / and /boot/efi. The instance store volume nvme1n1 is mounted at the directory /local. In sections 4 and 5, Spark applications get configured to write shuffle files to a directory under /local so the fast NVMe SSDs are used for the scratch space.

3.4 Access Management

If the demo application was submitted to the Kubernetes cluster at this point, failures would occur as pods have not been granted permissions yet to perform actions on external resources like S3. To avoid this, an IAM policy with these S3 and logging permissions will be defined.

eks-test-policy.json


{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts",
        "s3:ListBucketMultipartUploads"
      ],
      "Resource": [
        "arn:aws:s3:::$S3_TEST_BUCKET",
        "arn:aws:s3:::$S3_TEST_BUCKET/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups",
        "logs:DescribeLogStreams",
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:*:*:*"
      ]
    }
  ]
}

A few environment variables should be declared since subsequent commands make multiple references to account details that change from user to user: S3_TEST_BUCKET holds the name of the S3 bucket under which the application input and results are stored, the account id is retrieved with a command and assigned to the variable ACCOUNT_ID:


$ export S3_TEST_BUCKET=xonai
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

An IAM policy named eks-test-policy can now be defined easily:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-test-policy.json | envsubst > eks-test-policy.json
$ aws iam create-policy --policy-name eks-test-policy --policy-document file://eks-test-policy.json

Pods do not directly use an IAM policy, the association happens through a Kubernetes service account which is created as follows:


$ eksctl create iamserviceaccount --name oss-sa --namespace oss --cluster eks-test-cluster --attach-policy-arn arn:aws:iam::$ACCOUNT_ID:policy/eks-test-policy --approve --override-existing-serviceaccounts

This single <span class="monospace">eksctl</span> command accomplishes multiple tasks:

  1. A namespace oss is created.
  2. An IAM role and service account pair is created.
  3. The IAM role is mapped to the service account oss/oss-sa.
  4. The policy eks-test-policy is attached to the newly created role.

In section 4.1, the job manifest configures the Spark pods to use the oss/oss-sa service account so they can enjoy the permissions declared in eks-test-policy. The policy has to be created only once but the <span class="monospace>"create iamserviceaccount</span> command needs to be repeated whenever a new EKS cluster is provisioned.

3.5 The Kubernetes Dashboard

A useful tool in the Kubernetes ecosystem is the Dashboard, a web-based UI that visualizes a cluster and its K8s objects. Unfortunately, it is not pre-installed on EKS clusters, so a number of steps are required before it can be accessed. The Dashboard can be deployed as follows:


$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.6.1/aio/deploy/recommended.yaml

Accessing the UI is much more involved than installing it, the most straightforward authentication strategy consists in using a bearer token: First, the kubectl proxy is started:


$ kubectl proxy
Starting to serve on 127.0.0.1:8001

Copy-pasting the below URL into a browser window opens the Dashboard login view, the port number (8001 here) may have to be adjusted depending on the proxy output:


http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy

To pass through the login page, an authentication token needs to be retrieved from a new terminal window:


$ aws eks get-token --cluster-name eks-test-cluster | jq -r '.status.token'

After pasting the returned string into the Enter Token field of the login page, the UI should appear:

No applications have been submitted yet but the UI is still populated: After the All namespaces option is selected from the drop-down menu near the top-left corner, the Workloads tab displays a few active Kubernetes objects that provide core cluster and Dashboard functionality. When the first benchmark workload is deployed in the next section, a new Jobs entry along with multiple Spark pods will be displayed:

4. Using OSS SPARK on EKS

After following the instructions from the last section, an EKS cluster that is configured to access AWS resources will be running. The custom oss Spark image that was created at the beginning of this article can now be used in the first workload.

4.1 The Kubernetes Job Configuration

For our previous benchmark article, we executed TPC-H queries via spark-submit on EC2 and on EMR. The same program can be run on an EKS cluster by integrating it with a Kubernetes Job. Our oss-job.yaml manifest follows this approach, it instantiates a container from the spark_3.1.2_tpch image and invokes spark-submit in it. The most important segments of this job configuration are described in more detail below.

oss-job.yaml


apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-role
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: edit
subjects:
  - kind: ServiceAccount
    name: oss-sa
    namespace: oss
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: template-map
  namespace: oss
data:
  driver: |-
    apiVersion: v1
    kind: Pod
    spec:
      nodeSelector:
        noderole: driver
      initContainers:
      - name: volume-permissions
        image: public.ecr.aws/docker/library/busybox
        command: ['sh', '-c', 'mkdir /local/scratch; chown -R 185 /local/scratch']
        volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

  executor: |-
    apiVersion: v1
    kind: Pod
    spec:
      nodeSelector:
        noderole: executor
      initContainers:
      - name: volume-permissions
        image: public.ecr.aws/docker/library/busybox
        command: ['sh', '-c', 'mkdir /local/scratch; chown -R 185 /local/scratch']
        volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1
---
apiVersion: batch/v1
kind: Job
metadata:
  name: oss-tpch-job
  namespace: oss
spec:
  template:
    spec:
      containers:
        - name: job-container
          image: $OSS_IMAGE_URI
          args: [
            "/bin/sh",
            "-c",
            "/opt/spark/bin/spark-submit \
            --master k8s://https://kubernetes.default.svc.cluster.local:443 \
            --deploy-mode cluster \
            --name oss-tpch-spark \
            --class com.xonai.RunMainTPC \
            --conf spark.dynamicAllocation.enabled=false \
            --conf spark.driver.memory=4G \
            --conf spark.executor.memory=10G \
            --conf spark.executor.cores=3 \
            --conf spark.executor.instances=4 \
            --conf spark.sql.shuffle.partitions=250 \
            --conf spark.kubernetes.container.image=$OSS_IMAGE_URI \
            --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
            --conf spark.kubernetes.container.image.pullPolicy=Always \
            --conf spark.kubernetes.authenticate.driver.serviceAccountName=oss-sa \
            --conf spark.kubernetes.namespace=oss \
            --conf spark.kubernetes.driver.label.spark/component=driver \
            --conf spark.kubernetes.executor.label.spark/component=executor \
            --conf spark.kubernetes.driver.podTemplateFile='/opt/spark/conf/driver_pod_template.yaml' \
            --conf spark.kubernetes.executor.podTemplateFile='/opt/spark/conf/executor_pod_template.yaml' \
            --conf spark.kubernetes.driver.volumes.hostPath.spark-local-dir-1.mount.path='/local/scratch' \
            --conf spark.kubernetes.driver.volumes.hostPath.spark-local-dir-1.options.path='/local/scratch' \
            --conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.mount.path='/local/scratch' \
            --conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.options.path='/local/scratch' \
            --conf spark.local.dir='/local/scratch' \
            --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
            --conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
            --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
            s3a://$S3_TEST_BUCKET/cloudbench-assembly-1.0.jar \
            \"s3a://$S3_TEST_BUCKET/tpch_input\" \
            \"s3a://$S3_TEST_BUCKET/tpch_result\" \
            \"/opt/tpch-dbgen\" \
            \"100\"
            \"1\""
          ]
          volumeMounts:
            - name: template-volume
              mountPath: /opt/spark/conf/driver_pod_template.yaml
              subPath: driver
            - name: template-volume
              mountPath: /opt/spark/conf/executor_pod_template.yaml
              subPath: executor
      serviceAccountName: oss-sa
      restartPolicy: Never
      volumes:
        - name: template-volume
          configMap:
            name: template-map
            defaultMode: 420
  backoffLimit: 4

The main program that gets called by spark-submit in line 66 is RunMainTPC.scala. It is packaged into the JAR cloudbench-assembly-1.0.jar, the steps required for creating this file are described on our Github repository. <span class="monospace">RunMainTPC</span> expects five arguments which are specified right after the JAR file on lines 91 to 95 of the Job definition.

Not all relevant pod settings can be directly defined with custom flags in the spark-submit command. Therefore, the manifest uses the pod template mechanism for accomplishing two important configuration tasks:

  • Node selectors are declared which constrain the set of worker nodes to which driver and executor pods can be assigned.  For example, the driver pod can only be scheduled on a m6gd.large instance (or m5d.large for x86) as no other cluster node was tagged with the label noderole: driver .  
  • Sidecar containers are added, they configure the volume access so Spark can write scratch data under the mount point directory of the fast NVMe volume.

The template content is stored as key-value pairs in the data field of the ConfigMap eks-configmap in line 19. This map gets mounted as a volume into the job container: The volumes section of the Job spec (line 106) references eks-configmap and copies its contents into the driver and executor template files whose paths are specified in the volumeMounts field. Spark pods can then read the configuration data from the file mounts.

It is important that the NVMe devices that are attached to the m6gd.2xlarge (or m5d.2xlarge) instances are used for the Spark scratch space. This can be validated by connecting to a node that hosts executors (see 3.3) and checking its disk space while a query with a larger shuffle  is executed:


[ec2-user@ip-192-xxx-xxx-89 ~]$ df -H
Filesystem        Size  Used Avail Use% Mounted on
devtmpfs           17G     0   17G   0% /dev
tmpfs              17G     0   17G   0% /dev/shm
tmpfs              17G  1.1M   17G   1% /run
tmpfs              17G     0   17G   0% /sys/fs/cgroup
/dev/nvme0n1p1     17G  3.8G   13G  24% /
/dev/nvme0n1p128   11M  4.0M  6.6M  38% /boot/efi
/dev/nvme1n1      474G   19G  459G   5% /local
tmpfs             3.3G     0  3.3G   0% /run/user/1000

In oss-job.yaml, the application property spark.local.dir is set to /local/scratch in line 86 so temporary data (e.g., shuffle files) are written to this directory. According to the terminal output, the mount point directory /local is associated with the NMVe volume which held around 19G of Spark scratch data when the df command was entered. This number is higher than the total root volume size and confirms the correctness of the disk setup.

4.2 Running the Kubernetes Job

The batch job described in oss-job.yaml can be deployed as follows:


$ export S3_TEST_BUCKET=xonai
$ export AWS_REGION=us-east-1
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ export OSS_IMAGE_URI=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/spark_3.1.2_tpch:latest
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/oss-job.yaml | envsubst > oss-job.yaml
# Modify the program arguments in lines 91 to 95 if required
$ kubectl apply -f oss-job.yaml

After the last command is entered, a job pod launches almost immediately. Its container invokes the spark-submit process and a Spark driver pod is scheduled. The driver contacts the K8s API server with a request for additional executor pods. After one or two minutes, six pods in total should be up and running in the oss namespace:


$ kubectl get jobs -n oss
NAME           COMPLETIONS   DURATION   AGE
oss-tpch-job   0/1           90s        90s

$ kubectl get pods -n oss
NAME                                                 READY   STATUS    RESTARTS   AGE
oss-tpch-job-dbvzk                                   1/1     Running   0          94s
oss-tpch-spark-5d3d7f83412a3f5c-driver               1/1     Running   0          74s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-1   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-2   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-3   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-4   1/1     Running   0          53s

The names in the terminal output above indicate that four executor pods were launched which corresponds to the <span class="monospace">spark.executor.instances=4</span> setting in the Job spec as one Spark executor runs in one container/pod.

The Spark web interface can be accessed after forwarding the local port 4040 to the same port on the driver pod:


$ kubectl port-forward oss-tpch-spark-5d3d7f83412a3f5c-driver 4040:4040 -n oss
Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 -> 4040

The UI can now be viewed at http://localhost:4040/:

For our demo run, a TPC-H scale factor of 100 was used, so the executors need to process 100 GB of input data. This takes around 20 minutes:


$ kubectl get pods -n oss
NAME                                      READY   STATUS      RESTARTS   AGE
oss-tpch-job-dbvzk                        0/1     Completed   0          22m
oss-tpch-spark-5d3d7f83412a3f5c-driver    0/1     Completed   0          22m

After the Spark application has succeeded, the Job object and the driver pod linger on in a completed state. Therefore, the driver log can still be inspected from the command-line:


$ kubectl logs oss-tpch-spark-5d3d7f83412a3f5c-driver -n oss
[...]
   running Thread[benchmark runner,5,main]
   Execution time: 18.987546802s
   Results written to table: 'sqlPerformance' at s3a://xonai/result/timestamp=1663172601987

During the final stage, the application writes a JSON file to S3 that contains execution information like query run times. The Job object needs to be deleted before the application can be rerun:


$ kubectl delete job oss-tpch-job -n oss
job.batch "oss-tpch-job" deleted
$ kubectl delete pod oss-tpch-spark-5d3d7f83412a3f5c-driver -n oss
pod "oss-tpch-spark-5d3d7f83412a3f5c-driver" deleted

5. Using EMR on EKS

Before an EMR job can be submitted to a Kubernetes cluster, a few objects and relationships that facilitate the interplay between the EKS and EMR services need to be created. The next section walks through the necessary setup steps, the last part describes how an EMR workload can be deployed.

5.1 Setting up a virtual cluster

The first step in preparing the existing cluster for EMR on EKS consists in creating a new IAM role that will be used with the virtual cluster:


$ aws iam create-role --role-name emr-test-role --assume-role-policy-document '{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "eks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}'

The policy that was created at the beginning of 3.4 can be used by more than one IAM identity. After eks-test-policy is attached to the new execution role, it will enjoy the same permissions as the role for the service account oss/oss-sa that was used by the first workload:


$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ aws iam attach-role-policy --role-name emr-test-role --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/eks-test-policy

In 4.2, the Kubernetes Job was deployed under the oss namespace, so another dedicated namespace should be created now. When EMR is registered with it, a virtual cluster is launched.


$ kubectl create namespace emr

EMR processes still lack permissions to use the new namespace. This can be rectified with an identity mapping command:


$ export AWS_REGION=us-east-1 
$ eksctl create iamidentitymapping --region $AWS_REGION --cluster eks-test-cluster --namespace emr --service-name "emr-containers"

In the penultimate step, a trust relationship between emr-test-role and EMR service accounts is created, so the latter can use the role credentials. It is not necessary to manually set up an EMR managed service account, this is done automatically upon job run submission.


$ aws emr-containers update-role-trust-policy \
   --cluster-name eks-test-cluster  \
   --namespace emr \
   --role-name emr-test-role

Finally, the virtual EMR cluster is launched:


$ aws emr-containers create-virtual-cluster --name emr-test-cluster \
    --container-provider '{
        "id": "'eks-test-cluster'",
        "type": "EKS",
        "info": { "eksInfo": { "namespace": "'emr'" } }
    }'

In the EMR console, the Virtual clusters section at the bottom left should now list a new active cluster:

The EMR console does not (yet) provide a lot of functionality when it comes to orchestrating virtual clusters and job runs, most actions need to originate from the command-line. For example, the following command that will be used in the next section retrieves the details of the virtual cluster:


$ aws emr-containers list-virtual-clusters --query "virtualClusters[?name == 'emr-test-cluster' && state == 'RUNNING']"
[
    {
        "id": "2jf0w2hwzpia1f6yb5hyog9oa",
        "name": "emr-test-cluster",
        "arn": "arn:aws:emr-containers:us-east-1:000000000000:/virtualclusters/2jf0w2hwzpia1f6yb5hyog9oa",
        "state": "RUNNING",
        "containerProvider": {
            "type": "EKS",
            "id": "eks-test-cluster",
            "info": {
                "eksInfo": {
                    "namespace": "emr"
                }
            }
        },
        "createdAt": "2022-09-23T15:45:05+00:00",
        "tags": {}
    }
]

5.2 Scheduling A Job Run

With the EMR on EKS setup described above, it is more convenient to place the pod template contents in files on S3 than to populate and mount a ConfigMap. The two manifests driver_template.yaml and exec_template.yaml contain similar pod configuration data as eks-configmap in the K8s Job specification:

driver_template.yaml


apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local/scratch
  nodeSelector:
    noderole: driver
  containers:
    - name: spark-kubernetes-driver
      volumeMounts:
        - name: spark-local-dir-1
          mountPath: /local/scratch
  initContainers:
    - name: volume-permissions
      image: public.ecr.aws/docker/library/busybox
      command: [ 'sh', '-c', 'mkdir /local/scratch; chown -R 999:1000 /local/scratch' ]
      volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

exec_template.yaml


apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local/scratch
  nodeSelector:
    noderole: executor
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
        - name: spark-local-dir-1
          mountPath: /local/scratch
  initContainers:
    - name: volume-permissions
      image: public.ecr.aws/docker/library/busybox
      command: [ 'sh', '-c', 'mkdir /local/scratch; chown -R 999:1000 /local/scratch' ]
      volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

Both template files should be copied to S3:


$ export S3_TEST_BUCKET=xonai
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/driver_pod_template.yaml > driver_pod_template.yaml
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/exec_pod_template.yaml > exec_pod_template.yaml
$ aws s3 cp driver_pod_template.yaml s3://$S3_TEST_BUCKET/
$ aws s3 cp exec_pod_template.yaml s3://$S3_TEST_BUCKET/ 

Our benchmark program can now be submitted to the virtual cluster. The emr-job.json file contains the EMR equivalent to the Kubernetes Job definition from 4.1 . While the latter ultimately relies on open source Spark, the job run initiated below uses EMR release 6.5.0 and the custom image that embeds the EMR runtime:

emr-job.json


{
  "virtualClusterId": "$VIRTUAL_ID",
  "name": "emr-tpch-job",
  "executionRoleArn": "arn:aws:iam::$ACCOUNT_ID:role/emr-test-role",
  "releaseLabel": "emr-6.5.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://$S3_TEST_BUCKET/cloudbench-assembly-1.0.jar",
      "entryPointArguments": [
        "s3://$S3_TEST_BUCKET/tpch_input",
        "s3://$S3_TEST_BUCKET/tpch_result",
        "/opt/tpch-dbgen",
        "100",
        "1"
      ],
      "sparkSubmitParameters": "--name emr-tpch-spark --class com.xonai.RunMainTPC --conf spark.driver.memory=4G --conf spark.executor.cores=3 --conf spark.executor.memory=10G --conf spark.executor.instances=4 --conf spark.sql.shuffle.partitions=250"
    }
  },
  "configurationOverrides": {
    "applicationConfiguration": [
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.kubernetes.container.image": "$EMR_IMAGE_URI",
          "spark.kubernetes.driver.podTemplateFile": "s3://$S3_TEST_BUCKET/driver_pod_template.yaml",
          "spark.kubernetes.executor.podTemplateFile": "s3://$S3_TEST_BUCKET/exec_pod_template.yaml",
          "spark.dynamicAllocation.enabled": "false",
          "spark.local.dir": "/local/scratch"
        }
      }
    ],
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {
        "logUri": "s3://$S3_TEST_BUCKET/elasticmapreduce/emr-containers"
      }
    }
  }
}

An EMR job run can be started as follows:


$ export S3_TEST_BUCKET=xonai
$ export AWS_REGION=us-east-1 
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ export VIRTUAL_ID=$(aws emr-containers list-virtual-clusters --query "virtualClusters[?name == 'emr-test-cluster' && state == 'RUNNING'].id" --output text)
$ export EMR_IMAGE_URI=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/emr_6.5_tpch:latest

$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/emr-job.json | envsubst > emr-job.json
# Modify the program arguments in lines 10 to 14 if required
$ aws emr-containers start-job-run --cli-input-json file://./emr-job.json

Similar to the execution of the oss Spark job, six pods in total get scheduled after initiating the job run:


$ kubectl get pods -n emr
NAME                                                 READY   STATUS    RESTARTS   AGE
000000030rb1lh9ikmj-cf268                            3/3     Running   0          2m47s
spark-000000030rb1lh9ikmj-driver                     2/2     Running   0          2m35s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-1   2/2     Running   0          2m3s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-2   2/2     Running   0          2m3s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-3   2/2     Running   0          2m2s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-4   2/2     Running   0          2m2s

Clicking on the emr-test-cluster link in the EMR console’s Virtual Clusters window opens a job run view. The recently launched job should be listed in a running state:

After the application completes, all associated pods get deleted almost immediately which is a difference to the oss counterpart in 4.2. The final status of the job run is displayed on the EMR console and can be fetched from the command-line:


$ kubectl get pods -n emr
No resources found in emr namespace.
$ aws emr-containers list-job-runs --virtual-cluster-id $VIRTUAL_ID
{
    "jobRuns": [
        {
            "id": "000000030rb1lh9ikmj",
            "name": "emr-tpch-job",
            "virtualClusterId": "2jf0w2hwzpia1f6yb5hyog9oa",
            "arn": "arn:aws:emr-containers:us-east-1:000000000000:/virtualclusters/2jf0w2hwzpia1f6yb5hyog9oa/jobruns/000000030rb1lh9ikmj",
            "state": "COMPLETED",
            "executionRoleArn": "arn:aws:iam::000000000000:role/emr-test-role",
            "releaseLabel": "emr-6.5.0-latest",
            "createdAt": "2022-09-23T15:53:02+00:00",
            "finishedAt": "2022-09-23T16:12:19+00:00",
            "stateDetails": "JobRun completed successfully. It ran for 18 Minutes 45 Seconds",
            "tags": {}
        },
[...]

5.3 Cleaning up

After all applications have completed, the virtual cluster and underlying EKS cluster should be deleted:


$ aws emr-containers delete-virtual-cluster --id $VIRTUAL_ID
$ eksctl delete cluster -n eks-test-cluster

For the deletion of a virtual cluster, operations on system resources are not required, so the first command completes almost instantly. The termination of the physical EKS cluster involves the deprovisioning of nodes and the deletion of CloudFormation stacks which can take a few minutes.

1. Introduction

Apache Spark on Kubernetes is generally available since release 3.1.0 and is quickly gaining popularity as an alternative to YARN clusters, enabling efficient resource sharing, multi cloud deployments and access to a rich open-source ecosystem of tools ranging from monitoring and management to end-to-end data and machine learning pipelines. On Amazon EMR on EKS, Spark jobs can be deployed on Kubernetes clusters, sharing compute resources across applications with a single set of Kubernetes tools centralizing monitoring and management of infrastructure.

Unfortunately, it is not straightforward to deploy Spark workloads to a Kubernetes cluster that is mostly comprised of Graviton-based instance types: The Spark operator does not seem to be compatible with architectures other than x86 and we encountered problems when using the manifests from official AWS guides. This article walks through an end-to-end example of running a demo application on an EKS cluster with Graviton or x86-based instances and on an EMR virtual cluster: The next section describes how customized Docker images with open-source and EMR runtimes can be created and published to ECR. Sample configuration steps for provisioning an EKS cluster and running an oss Spark job on it are covered in section 3. In the last part of this post, EMR on EKS is enabled and a job run on a virtual cluster is scheduled.

2. Customizing Docker Images

Kubernetes is a container orchestrator so applications need to be containerized before they can be submitted to an EKS cluster. On AWS, container images are distributed with the Elastic Container Registry service. The following subsections show how to build and publish custom Docker images with a Spark distribution and an EMR runtime. In the second half of this article, these images are deployed to an EKS cluster.

2.1 Prerequisites 

Container images are stored in repositories of a registry. For the two custom Docker images that will be used later, two private ECR repositories with the names spark_3.1.2_tpch and emr_6.5_tpch should be created:

We build the images on EC2. The choice of the instance type depends on the processor architecture of the EKS cluster nodes that will host Spark pods: When ARM-based nodes will be provisioned, the m6g.xlarge instance type is more than sufficient for creating the custom images. For nodes with x86 architecture CPUs, an m5.xlarge instance should be launched. Amazon Linux 2 needs to be selected as the machine image and the root volume size should be set to around 20 GiB. When the instance is up and running, we log onto it and install a few dependencies: 


$ sudo yum install docker
$ sudo service docker start
$ sudo amazon-linux-extras enable corretto8
$ sudo yum install java-1.8.0-amazon-corretto-devel
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0-amazon-corretto

It is advisable to set a few environment variables, this shortens subsequent commands for pushing and pulling images:


$ export AWS_REGION=us-east-1
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

The format of a repository URL is determined by its visibility:

  • URLs for private ECR repositories follow the format account-id.dkr.ecr.region.amazonaws.com
  • Public repo URLs start with public.ecr.aws/ followed by an alias 

A variable that stores a private repo URL is defined as follows: 


$ export ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com

2.2 Building a Custom OSS Spark Image

The Apache Spark codebase already contains a Dockerfile that is suitable for creating a base image with a binary distribution from which more specialized images can be derived. As first step, the Spark source code needs to be downloaded and extracted:


$ wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2.tgz
$ tar -xvzf spark-3.1.2.tgz

The AWS EKS guide suggests to build Spark 3.1.2 with a member of the Hadoop 3.3 release line so improvements for Kubernetes and S3 can take effect. One entry of the root POM file has to be modified for this purpose:


$ sed -i 's|<hadoop.version>3.2.0</hadoop.version>|<hadoop.version>3.3.1</hadoop.version>|' spark-3.1.2/pom.xml

A runnable Spark distribution is then composed with the help of the make-distribution script:


$ ll
total 24164
drwxr-xr-x 30 ec2-user ec2-user     4096 May 24  2021 spark-3.1.2
-rw-rw-r--  1 ec2-user ec2-user 24738927 May 24  2021 spark-3.1.2.tgz
  
$ ./spark-3.1.2/dev/make-distribution.sh --name spark-oss --pip --tgz -Phadoop-cloud -Phadoop-3.2 -Pkubernetes -DskipTests

Now the base image can be created:


$ cd spark-3.1.2/dist
$ sudo docker build -f kubernetes/dockerfiles/spark/Dockerfile -t spark_3.1.2 --build-arg java_image_tag=8-jre-slim .
$ sudo docker image ls
REPOSITORY   TAG          IMAGE ID       CREATED              SIZE
spark_3.1.2  latest       19ca9b6276c9   About a minute ago   679MB
openjdk      8-jre-slim   85b121affedd   7 weeks ago          194MB

Since the sample program that is executed later on needs to access the TPC-H kit, a layer with this library should be added to the base image. Our Dockerfile contains all the instructions required for assembling a TPC-H image:


ARG BASE_IMAGE=755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0:latest

FROM amazonlinux:2 as tpc-toolkit

RUN yum group install -y "Development Tools" && \
    git clone https://github.com/databricks/tpch-dbgen.git /tmp/tpch-dbgen && \
    cd /tmp/tpch-dbgen && \
    git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8 -- bm_utils.c  && \
    make OS=LINUX

FROM ${BASE_IMAGE}

COPY --from=tpc-toolkit /tmp/tpch-dbgen /opt/tpch-dbgen

The TPC-H image is derived as follows:


$ cd ~
$ wget https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/Dockerfile
$ sudo docker build -t $ECR_URL/spark_3.1.2_tpch -f ./Dockerfile --build-arg BASE_IMAGE=spark_3.1.2 .
$ sudo docker image ls
REPOSITORY                                                      TAG          IMAGE ID       CREATED          SIZE
[...]
spark_3.1.2                                                     latest       19ca9b6276c9   3 minutes ago    679MB
000000000000.dkr.ecr.us-east-1.amazonaws.com/spark_3.1.2_tpch   latest       5e4f08f28cb5   38 seconds ago   682MB

Before the new image can be pushed to the spark_3.1.2_tpch repository, Docker must be authenticated to the registry:


$ aws ecr get-login-password --region $AWS_REGION | sudo docker login --username AWS --password-stdin $ECR_URL

After the authentication succeeds, the image is published:


$ sudo docker push $ECR_URL/spark_3.1.2_tpch

2.3 Building a custom EMR Image

The spark_3.1.2_tpch image is not compatible with EMR on EKS as it encapsulates open source Spark binaries. Therefore, a similar package that embeds an EMR runtime needs to be created before the demo application can be used in a job run. EMR images are publicly available, the different ECR registry accounts are listed on this page.  For the us-east-1 region, the corresponding repository URL is 755674844232.dkr.ecr.us-east-1.amazonaws.com:


$ export ECR_PUBLIC=755674844232.dkr.ecr.us-east-1.amazonaws.com

An authentication step is required before accessing any official AWS image:


$ aws ecr get-login-password --region $AWS_REGION | sudo docker login --username AWS --password-stdin $ECR_PUBLIC

The EMR equivalent to the spark_3.1.2_tpch image can now be assembled. Because a 3.1.2 distribution was compiled in the previous section, a base image with an emr-6.5.0 tag should be downloaded since the same Spark version is supported by this EMR release:


$ sudo docker pull $ECR_PUBLIC/spark/emr-6.5.0:latest
$ sudo docker image ls
REPOSITORY                                                     TAG          IMAGE ID       CREATED         SIZE
[...]
755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0   latest       77290d3aac6a   6 weeks ago     1.47GB

The Dockerfile that adds the TPC-H kit to the container filesystem is reused, but this time an EMR URI is passed as its base image argument:


$ ls
Dockerfile  spark-3.1.2  spark-3.1.2.tgz
$ sudo docker build -t $ECR_URL/emr_6.5_tpch -f ./Dockerfile --build-arg BASE_IMAGE=$ECR_PUBLIC/spark/emr-6.5.0:latest .
$ sudo docker image ls
REPOSITORY                                                     TAG          IMAGE ID       CREATED              SIZE
[...]
755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-6.5.0   latest       77290d3aac6a   6 weeks ago          1.47GB
402730444140.dkr.ecr.us-east-1.amazonaws.com/emr_6.5_tpch      latest       779050788b59   About a minute ago   1.47GB

Finally, the derived EMR-TPCH image is uploaded to a dedicated ECR repository:


$ sudo docker push $ECR_URL/emr_6.5_tpch

After the images have been published, the EC2 instance that executed the build instructions is no longer needed and should be terminated.

3. Creating an EKS Cluster

The following paragraphs explain how a Kubernetes cluster can be deployed with EKS. In section 4, a Spark application is submitted to this cluster and in the last part of this article, EMR is registered with it and a job run is initiated.

3.1 Prerequisites

EKS clusters can be launched from local development environments but the command-line tools <span class="monospace">kubectl</span> and <span class="monospace">eksctl</span> need to be installed, more information is provided on this page. The architecture and basic concepts of Kubernetes are described here.

3.2 Cluster Configuration

Our eks-cluster-arm.yaml and eks-cluster-x86.yaml manifests contain the configuration details for a basic EKS cluster.

eks-cluster-arm.yaml


---
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: eks-test-cluster
  region: us-east-1 # ToDo: Modify
  version: "1.23"
availabilityZones: [ "us-east-1a", "us-east-1d" ] # ToDo: Modify
vpc:
  nat:
    gateway: HighlyAvailable
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
cloudWatch: # Control Plane logs
  clusterLogging:
    enableTypes: [ "*" ]
iam:
  withOIDC: true
  serviceAccounts:
    - metadata:
        name: cluster-autoscaler
        namespace: kube-system
        labels: { aws-usage: "cluster-ops" }
      wellKnownPolicies:
        autoScaler: true
      roleName: eks-test-autoscaler-role
managedNodeGroups:
  - name: tooling
    instanceType: t3.large
    desiredCapacity: 1
    volumeSize: 20
    labels:
      noderole: tooling
    tags:
      k8s.io/cluster-autoscaler/node-template/label/noderole: tooling
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
nodeGroups:
  - name: driver-group
    availabilityZones: [ "us-east-1d" ] # ToDo: Modify
    desiredCapacity: 1
    volumeSize: 15
    privateNetworking: true
    instanceType: "m6gd.large"
    labels:
      arch: arm64
      noderole: driver
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned"
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV}; mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a
  - name: exec-group
    availabilityZones: [ "us-east-1d" ] # ToDo: Modify
    desiredCapacity: 2
    volumeSize: 15
    privateNetworking: true
    instanceType: "m6gd.2xlarge"
    labels:
      arch: arm64
      noderole: executor
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned"
    iam:
      withAddonPolicies:
        ebs: true
        fsx: true
        efs: true
        autoScaler: true
        cloudWatch: true
      attachPolicyARNs:
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV}; mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a


If the application images support ARM architectures, the file with the matching arm infix needs to be downloaded as it specifies worker nodes with this CPU architecture:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-cluster-arm.yaml > eks-test-cluster.yaml

The manifest eks-cluster-x86.yaml should be used for the alternative x86 architecture option:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-cluster-x86.yaml > eks-test-cluster.yaml

A dedicated VPC will be created for the cluster, so only a few lines that reference availability zones and are marked with ToDo comments may have to be modified. The test cluster can be spawned with the <span class="monospace">eksctl</span> command-line tool:


# Make modifications to eks-test-cluster.yaml if required
$ eksctl create cluster -f eks-test-cluster.yaml 

As part of the cluster creation process that is started by the last command, several infrastructure resources are provisioned and configured via CloudFormation. These operations tend to be time consuming and can take more than half an hour to complete. 

If the cluster launch is successful, the final log message will contain the following statement:


[✔]  EKS cluster "eks-test-cluster" in "us-east-1" region is ready

In case errors appear on the terminal, it might be necessary to manually delete cluster-related stacks in the CloudFormation console and re-execute the <span class="monospace">eksctl</span> command.

Eventually, an EKS cluster that consists of four EC2 instances will be up and running. The local kubeconfig file should have been updated with an entry for eks-test-cluster, in which case the <span class="monospace">kubectl</span>/<span class="monospace">eksctl</span> CLIs point to the new cluster. If the commands used below return nothing or information of a different cluster, the kubeconfig documentation should be consulted.


$ kubectl get nodes
NAME                             STATUS   ROLES    AGE   VERSION
ip-192-xxx-xxx-64.ec2.internal   Ready    <none>   18m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-87.ec2.internal   Ready    <none>   18m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-19.ec2.internal   Ready    <none>   23m   v1.23.9-eks-ba74326
ip-192-xxx-xxx-17.ec2.internal   Ready    <none>   20m   v1.23.9-eks-ba74326

The four instances listed in the terminal output above are grouped into three node pools:


$ eksctl get nodegroup --cluster=eks-test-cluster
CLUSTER        NODEGROUP       STATUS             DES. CAP. INSTANCE TYPE  TYPE
eks-test-cluster tooling       ACTIVE             1         t3.large       managed
eks-test-cluster driver-group  CREATE_COMPLETE    1         m6gd.large     unmanaged
eks-test-cluster exec-group    CREATE_COMPLETE    2         m6gd.2xlarge   unmanaged

The control plane and other processes that provide core K8s functionality run on a low cost t3.large instance in the tooling nodegroup. The Spark driver pod will be hosted on a dedicated m6gd.large (or m5d.large for x86) instance. Executor pods can be assigned to two m6gd.2xlarge (or m5d.2xlarge) workers which comprise the exec-group nodegroup. No Spark pods have been launched yet but this will change during the next two sections:


$ kubectl get pods -A
NAMESPACE     NAME                      READY   STATUS    RESTARTS   AGE
kube-system   aws-node-6npz4            1/1     Running   0          27m
kube-system   aws-node-m4xfw            1/1     Running   0          27m
kube-system   aws-node-w2pzq            1/1     Running   0          29m
kube-system   aws-node-wfsxl            1/1     Running   0          32m
kube-system   coredns-7f5998f4c-42m7r   1/1     Running   0          42m
kube-system   coredns-7f5998f4c-hj44f   1/1     Running   0          42m
kube-system   kube-proxy-frtf8          1/1     Running   0          27m
kube-system   kube-proxy-hfmdz          1/1     Running   0          32m
kube-system   kube-proxy-jg7g2          1/1     Running   0          27m
kube-system   kube-proxy-nn4qp          1/1     Running   0          29m

The sample program that will run on the cluster was introduced in our previous post, it consists of 22 benchmark queries. Therefore, node reclaiming events and up/downscaling will be avoided — all four cluster nodes are provisioned as on-demand instances and, later on, neither dynamic allocation nor autoscaling will be activated. For workloads with less stringent requirements, the use of dynamic scaling and a few spot instances that can host Spark executors tends to be much more cost-effective.

3.3 Mounting NVME Disks

Both the m6gd and m5d families support instance store volumes, namely local NVMe SSDs. These devices are very good storage backends for the temporary files that Spark creates due to their high I/O performance. In our previous benchmarks, we have seen significant run time differences between different AWS storage options for shuffle-intensive queries like TPC-H Q18. NVMe-based SSDs are not automatically set up during an EKS cluster launch which is an important difference to EMR on Ec2 . For this reason, our cluster manifests contain preBootstrapCommands sections that perform the formatting and disk mounts. After the bootstrapping phase of an EKS cluster has completed, the disk setup can be checked by connecting to one of the nodes with an instance store volume. This requires two changes in the exec-group section of the cluster definition file:

  • The value for the privateNetworking field should be set to false so the associated instances receive a public IPv4 DNS.
  • An ssh entry similar to the following one should be inserted:

       ssh:
         allow: true
            publicKeyName: MyKeyName

After adding an ssh inbound rule to the security group of the target node, we can ssh into it and confirm the volume mounts:


[ec2-user@ip-192-xxx-xxx-87 ~]$ sudo nvme list
Node SN Model Namespace Usage Format FW Rev 
---------------- -------------------- ---------------------------------------- --------- -------------------------- ---------------- --------
/dev/nvme0n1 vol068d381d5e4ac15b8 Amazon Elastic Block Store 1 21.47 GB / 21.47 GB 512 B + 0 B 1.0 
/dev/nvme1n1 AWS6A06EB0611850CECB Amazon EC2 NVMe Instance Storage 1 474.00 GB / 474.00 GB 512 B + 0 B 0

[ec2-user@ip-192-xxx-xxx-87 ~]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme0n1 259:0 0 15G 0 disk 
├─nvme0n1p1 259:2 0 15G 0 part /
└─nvme0n1p128 259:3 0 10M 0 part /boot/efi
nvme1n1 259:1 0 441.5G 0 disk /local

According to the terminal output, two devices are attached to the instance: The root EBS volume nvme0n1 of size 15 G has two partitions whose mount points are / and /boot/efi. The instance store volume nvme1n1 is mounted at the directory /local. In sections 4 and 5, Spark applications get configured to write shuffle files to a directory under /local so the fast NVMe SSDs are used for the scratch space.

3.4 Access Management

If the demo application was submitted to the Kubernetes cluster at this point, failures would occur as pods have not been granted permissions yet to perform actions on external resources like S3. To avoid this, an IAM policy with these S3 and logging permissions will be defined.

eks-test-policy.json


{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts",
        "s3:ListBucketMultipartUploads"
      ],
      "Resource": [
        "arn:aws:s3:::$S3_TEST_BUCKET",
        "arn:aws:s3:::$S3_TEST_BUCKET/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups",
        "logs:DescribeLogStreams",
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:*:*:*"
      ]
    }
  ]
}

A few environment variables should be declared since subsequent commands make multiple references to account details that change from user to user: S3_TEST_BUCKET holds the name of the S3 bucket under which the application input and results are stored, the account id is retrieved with a command and assigned to the variable ACCOUNT_ID:


$ export S3_TEST_BUCKET=xonai
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

An IAM policy named eks-test-policy can now be defined easily:


$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/eks-test-policy.json | envsubst > eks-test-policy.json
$ aws iam create-policy --policy-name eks-test-policy --policy-document file://eks-test-policy.json

Pods do not directly use an IAM policy, the association happens through a Kubernetes service account which is created as follows:


$ eksctl create iamserviceaccount --name oss-sa --namespace oss --cluster eks-test-cluster --attach-policy-arn arn:aws:iam::$ACCOUNT_ID:policy/eks-test-policy --approve --override-existing-serviceaccounts

This single <span class="monospace">eksctl</span> command accomplishes multiple tasks:

  1. A namespace oss is created.
  2. An IAM role and service account pair is created.
  3. The IAM role is mapped to the service account oss/oss-sa.
  4. The policy eks-test-policy is attached to the newly created role.

In section 4.1, the job manifest configures the Spark pods to use the oss/oss-sa service account so they can enjoy the permissions declared in eks-test-policy. The policy has to be created only once but the <span class="monospace>"create iamserviceaccount</span> command needs to be repeated whenever a new EKS cluster is provisioned.

3.5 The Kubernetes Dashboard

A useful tool in the Kubernetes ecosystem is the Dashboard, a web-based UI that visualizes a cluster and its K8s objects. Unfortunately, it is not pre-installed on EKS clusters, so a number of steps are required before it can be accessed. The Dashboard can be deployed as follows:


$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.6.1/aio/deploy/recommended.yaml

Accessing the UI is much more involved than installing it, the most straightforward authentication strategy consists in using a bearer token: First, the kubectl proxy is started:


$ kubectl proxy
Starting to serve on 127.0.0.1:8001

Copy-pasting the below URL into a browser window opens the Dashboard login view, the port number (8001 here) may have to be adjusted depending on the proxy output:


http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy

To pass through the login page, an authentication token needs to be retrieved from a new terminal window:


$ aws eks get-token --cluster-name eks-test-cluster | jq -r '.status.token'

After pasting the returned string into the Enter Token field of the login page, the UI should appear:

No applications have been submitted yet but the UI is still populated: After the All namespaces option is selected from the drop-down menu near the top-left corner, the Workloads tab displays a few active Kubernetes objects that provide core cluster and Dashboard functionality. When the first benchmark workload is deployed in the next section, a new Jobs entry along with multiple Spark pods will be displayed:

4. Using OSS SPARK on EKS

After following the instructions from the last section, an EKS cluster that is configured to access AWS resources will be running. The custom oss Spark image that was created at the beginning of this article can now be used in the first workload.

4.1 The Kubernetes Job Configuration

For our previous benchmark article, we executed TPC-H queries via spark-submit on EC2 and on EMR. The same program can be run on an EKS cluster by integrating it with a Kubernetes Job. Our oss-job.yaml manifest follows this approach, it instantiates a container from the spark_3.1.2_tpch image and invokes spark-submit in it. The most important segments of this job configuration are described in more detail below.

oss-job.yaml


apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-role
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: edit
subjects:
  - kind: ServiceAccount
    name: oss-sa
    namespace: oss
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: template-map
  namespace: oss
data:
  driver: |-
    apiVersion: v1
    kind: Pod
    spec:
      nodeSelector:
        noderole: driver
      initContainers:
      - name: volume-permissions
        image: public.ecr.aws/docker/library/busybox
        command: ['sh', '-c', 'mkdir /local/scratch; chown -R 185 /local/scratch']
        volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

  executor: |-
    apiVersion: v1
    kind: Pod
    spec:
      nodeSelector:
        noderole: executor
      initContainers:
      - name: volume-permissions
        image: public.ecr.aws/docker/library/busybox
        command: ['sh', '-c', 'mkdir /local/scratch; chown -R 185 /local/scratch']
        volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1
---
apiVersion: batch/v1
kind: Job
metadata:
  name: oss-tpch-job
  namespace: oss
spec:
  template:
    spec:
      containers:
        - name: job-container
          image: $OSS_IMAGE_URI
          args: [
            "/bin/sh",
            "-c",
            "/opt/spark/bin/spark-submit \
            --master k8s://https://kubernetes.default.svc.cluster.local:443 \
            --deploy-mode cluster \
            --name oss-tpch-spark \
            --class com.xonai.RunMainTPC \
            --conf spark.dynamicAllocation.enabled=false \
            --conf spark.driver.memory=4G \
            --conf spark.executor.memory=10G \
            --conf spark.executor.cores=3 \
            --conf spark.executor.instances=4 \
            --conf spark.sql.shuffle.partitions=250 \
            --conf spark.kubernetes.container.image=$OSS_IMAGE_URI \
            --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
            --conf spark.kubernetes.container.image.pullPolicy=Always \
            --conf spark.kubernetes.authenticate.driver.serviceAccountName=oss-sa \
            --conf spark.kubernetes.namespace=oss \
            --conf spark.kubernetes.driver.label.spark/component=driver \
            --conf spark.kubernetes.executor.label.spark/component=executor \
            --conf spark.kubernetes.driver.podTemplateFile='/opt/spark/conf/driver_pod_template.yaml' \
            --conf spark.kubernetes.executor.podTemplateFile='/opt/spark/conf/executor_pod_template.yaml' \
            --conf spark.kubernetes.driver.volumes.hostPath.spark-local-dir-1.mount.path='/local/scratch' \
            --conf spark.kubernetes.driver.volumes.hostPath.spark-local-dir-1.options.path='/local/scratch' \
            --conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.mount.path='/local/scratch' \
            --conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.options.path='/local/scratch' \
            --conf spark.local.dir='/local/scratch' \
            --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
            --conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
            --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
            s3a://$S3_TEST_BUCKET/cloudbench-assembly-1.0.jar \
            \"s3a://$S3_TEST_BUCKET/tpch_input\" \
            \"s3a://$S3_TEST_BUCKET/tpch_result\" \
            \"/opt/tpch-dbgen\" \
            \"100\"
            \"1\""
          ]
          volumeMounts:
            - name: template-volume
              mountPath: /opt/spark/conf/driver_pod_template.yaml
              subPath: driver
            - name: template-volume
              mountPath: /opt/spark/conf/executor_pod_template.yaml
              subPath: executor
      serviceAccountName: oss-sa
      restartPolicy: Never
      volumes:
        - name: template-volume
          configMap:
            name: template-map
            defaultMode: 420
  backoffLimit: 4

The main program that gets called by spark-submit in line 66 is RunMainTPC.scala. It is packaged into the JAR cloudbench-assembly-1.0.jar, the steps required for creating this file are described on our Github repository. <span class="monospace">RunMainTPC</span> expects five arguments which are specified right after the JAR file on lines 91 to 95 of the Job definition.

Not all relevant pod settings can be directly defined with custom flags in the spark-submit command. Therefore, the manifest uses the pod template mechanism for accomplishing two important configuration tasks:

  • Node selectors are declared which constrain the set of worker nodes to which driver and executor pods can be assigned.  For example, the driver pod can only be scheduled on a m6gd.large instance (or m5d.large for x86) as no other cluster node was tagged with the label noderole: driver .  
  • Sidecar containers are added, they configure the volume access so Spark can write scratch data under the mount point directory of the fast NVMe volume.

The template content is stored as key-value pairs in the data field of the ConfigMap eks-configmap in line 19. This map gets mounted as a volume into the job container: The volumes section of the Job spec (line 106) references eks-configmap and copies its contents into the driver and executor template files whose paths are specified in the volumeMounts field. Spark pods can then read the configuration data from the file mounts.

It is important that the NVMe devices that are attached to the m6gd.2xlarge (or m5d.2xlarge) instances are used for the Spark scratch space. This can be validated by connecting to a node that hosts executors (see 3.3) and checking its disk space while a query with a larger shuffle  is executed:


[ec2-user@ip-192-xxx-xxx-89 ~]$ df -H
Filesystem        Size  Used Avail Use% Mounted on
devtmpfs           17G     0   17G   0% /dev
tmpfs              17G     0   17G   0% /dev/shm
tmpfs              17G  1.1M   17G   1% /run
tmpfs              17G     0   17G   0% /sys/fs/cgroup
/dev/nvme0n1p1     17G  3.8G   13G  24% /
/dev/nvme0n1p128   11M  4.0M  6.6M  38% /boot/efi
/dev/nvme1n1      474G   19G  459G   5% /local
tmpfs             3.3G     0  3.3G   0% /run/user/1000

In oss-job.yaml, the application property spark.local.dir is set to /local/scratch in line 86 so temporary data (e.g., shuffle files) are written to this directory. According to the terminal output, the mount point directory /local is associated with the NMVe volume which held around 19G of Spark scratch data when the df command was entered. This number is higher than the total root volume size and confirms the correctness of the disk setup.

4.2 Running the Kubernetes Job

The batch job described in oss-job.yaml can be deployed as follows:


$ export S3_TEST_BUCKET=xonai
$ export AWS_REGION=us-east-1
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ export OSS_IMAGE_URI=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/spark_3.1.2_tpch:latest
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/oss-job.yaml | envsubst > oss-job.yaml
# Modify the program arguments in lines 91 to 95 if required
$ kubectl apply -f oss-job.yaml

After the last command is entered, a job pod launches almost immediately. Its container invokes the spark-submit process and a Spark driver pod is scheduled. The driver contacts the K8s API server with a request for additional executor pods. After one or two minutes, six pods in total should be up and running in the oss namespace:


$ kubectl get jobs -n oss
NAME           COMPLETIONS   DURATION   AGE
oss-tpch-job   0/1           90s        90s

$ kubectl get pods -n oss
NAME                                                 READY   STATUS    RESTARTS   AGE
oss-tpch-job-dbvzk                                   1/1     Running   0          94s
oss-tpch-spark-5d3d7f83412a3f5c-driver               1/1     Running   0          74s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-1   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-2   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-3   1/1     Running   0          53s
tpc-cloud-benchmark-100-gb-9458e283412a6389-exec-4   1/1     Running   0          53s

The names in the terminal output above indicate that four executor pods were launched which corresponds to the <span class="monospace">spark.executor.instances=4</span> setting in the Job spec as one Spark executor runs in one container/pod.

The Spark web interface can be accessed after forwarding the local port 4040 to the same port on the driver pod:


$ kubectl port-forward oss-tpch-spark-5d3d7f83412a3f5c-driver 4040:4040 -n oss
Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 -> 4040

The UI can now be viewed at http://localhost:4040/:

For our demo run, a TPC-H scale factor of 100 was used, so the executors need to process 100 GB of input data. This takes around 20 minutes:


$ kubectl get pods -n oss
NAME                                      READY   STATUS      RESTARTS   AGE
oss-tpch-job-dbvzk                        0/1     Completed   0          22m
oss-tpch-spark-5d3d7f83412a3f5c-driver    0/1     Completed   0          22m

After the Spark application has succeeded, the Job object and the driver pod linger on in a completed state. Therefore, the driver log can still be inspected from the command-line:


$ kubectl logs oss-tpch-spark-5d3d7f83412a3f5c-driver -n oss
[...]
   running Thread[benchmark runner,5,main]
   Execution time: 18.987546802s
   Results written to table: 'sqlPerformance' at s3a://xonai/result/timestamp=1663172601987

During the final stage, the application writes a JSON file to S3 that contains execution information like query run times. The Job object needs to be deleted before the application can be rerun:


$ kubectl delete job oss-tpch-job -n oss
job.batch "oss-tpch-job" deleted
$ kubectl delete pod oss-tpch-spark-5d3d7f83412a3f5c-driver -n oss
pod "oss-tpch-spark-5d3d7f83412a3f5c-driver" deleted

5. Using EMR on EKS

Before an EMR job can be submitted to a Kubernetes cluster, a few objects and relationships that facilitate the interplay between the EKS and EMR services need to be created. The next section walks through the necessary setup steps, the last part describes how an EMR workload can be deployed.

5.1 Setting up a virtual cluster

The first step in preparing the existing cluster for EMR on EKS consists in creating a new IAM role that will be used with the virtual cluster:


$ aws iam create-role --role-name emr-test-role --assume-role-policy-document '{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "eks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}'

The policy that was created at the beginning of 3.4 can be used by more than one IAM identity. After eks-test-policy is attached to the new execution role, it will enjoy the same permissions as the role for the service account oss/oss-sa that was used by the first workload:


$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ aws iam attach-role-policy --role-name emr-test-role --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/eks-test-policy

In 4.2, the Kubernetes Job was deployed under the oss namespace, so another dedicated namespace should be created now. When EMR is registered with it, a virtual cluster is launched.


$ kubectl create namespace emr

EMR processes still lack permissions to use the new namespace. This can be rectified with an identity mapping command:


$ export AWS_REGION=us-east-1 
$ eksctl create iamidentitymapping --region $AWS_REGION --cluster eks-test-cluster --namespace emr --service-name "emr-containers"

In the penultimate step, a trust relationship between emr-test-role and EMR service accounts is created, so the latter can use the role credentials. It is not necessary to manually set up an EMR managed service account, this is done automatically upon job run submission.


$ aws emr-containers update-role-trust-policy \
   --cluster-name eks-test-cluster  \
   --namespace emr \
   --role-name emr-test-role

Finally, the virtual EMR cluster is launched:


$ aws emr-containers create-virtual-cluster --name emr-test-cluster \
    --container-provider '{
        "id": "'eks-test-cluster'",
        "type": "EKS",
        "info": { "eksInfo": { "namespace": "'emr'" } }
    }'

In the EMR console, the Virtual clusters section at the bottom left should now list a new active cluster:

The EMR console does not (yet) provide a lot of functionality when it comes to orchestrating virtual clusters and job runs, most actions need to originate from the command-line. For example, the following command that will be used in the next section retrieves the details of the virtual cluster:


$ aws emr-containers list-virtual-clusters --query "virtualClusters[?name == 'emr-test-cluster' && state == 'RUNNING']"
[
    {
        "id": "2jf0w2hwzpia1f6yb5hyog9oa",
        "name": "emr-test-cluster",
        "arn": "arn:aws:emr-containers:us-east-1:000000000000:/virtualclusters/2jf0w2hwzpia1f6yb5hyog9oa",
        "state": "RUNNING",
        "containerProvider": {
            "type": "EKS",
            "id": "eks-test-cluster",
            "info": {
                "eksInfo": {
                    "namespace": "emr"
                }
            }
        },
        "createdAt": "2022-09-23T15:45:05+00:00",
        "tags": {}
    }
]

5.2 Scheduling A Job Run

With the EMR on EKS setup described above, it is more convenient to place the pod template contents in files on S3 than to populate and mount a ConfigMap. The two manifests driver_template.yaml and exec_template.yaml contain similar pod configuration data as eks-configmap in the K8s Job specification:

driver_template.yaml


apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local/scratch
  nodeSelector:
    noderole: driver
  containers:
    - name: spark-kubernetes-driver
      volumeMounts:
        - name: spark-local-dir-1
          mountPath: /local/scratch
  initContainers:
    - name: volume-permissions
      image: public.ecr.aws/docker/library/busybox
      command: [ 'sh', '-c', 'mkdir /local/scratch; chown -R 999:1000 /local/scratch' ]
      volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

exec_template.yaml


apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local/scratch
  nodeSelector:
    noderole: executor
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
        - name: spark-local-dir-1
          mountPath: /local/scratch
  initContainers:
    - name: volume-permissions
      image: public.ecr.aws/docker/library/busybox
      command: [ 'sh', '-c', 'mkdir /local/scratch; chown -R 999:1000 /local/scratch' ]
      volumeMounts:
        - mountPath: /local/scratch
          name: spark-local-dir-1

Both template files should be copied to S3:


$ export S3_TEST_BUCKET=xonai
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/driver_pod_template.yaml > driver_pod_template.yaml
$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/exec_pod_template.yaml > exec_pod_template.yaml
$ aws s3 cp driver_pod_template.yaml s3://$S3_TEST_BUCKET/
$ aws s3 cp exec_pod_template.yaml s3://$S3_TEST_BUCKET/ 

Our benchmark program can now be submitted to the virtual cluster. The emr-job.json file contains the EMR equivalent to the Kubernetes Job definition from 4.1 . While the latter ultimately relies on open source Spark, the job run initiated below uses EMR release 6.5.0 and the custom image that embeds the EMR runtime:

emr-job.json


{
  "virtualClusterId": "$VIRTUAL_ID",
  "name": "emr-tpch-job",
  "executionRoleArn": "arn:aws:iam::$ACCOUNT_ID:role/emr-test-role",
  "releaseLabel": "emr-6.5.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://$S3_TEST_BUCKET/cloudbench-assembly-1.0.jar",
      "entryPointArguments": [
        "s3://$S3_TEST_BUCKET/tpch_input",
        "s3://$S3_TEST_BUCKET/tpch_result",
        "/opt/tpch-dbgen",
        "100",
        "1"
      ],
      "sparkSubmitParameters": "--name emr-tpch-spark --class com.xonai.RunMainTPC --conf spark.driver.memory=4G --conf spark.executor.cores=3 --conf spark.executor.memory=10G --conf spark.executor.instances=4 --conf spark.sql.shuffle.partitions=250"
    }
  },
  "configurationOverrides": {
    "applicationConfiguration": [
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.kubernetes.container.image": "$EMR_IMAGE_URI",
          "spark.kubernetes.driver.podTemplateFile": "s3://$S3_TEST_BUCKET/driver_pod_template.yaml",
          "spark.kubernetes.executor.podTemplateFile": "s3://$S3_TEST_BUCKET/exec_pod_template.yaml",
          "spark.dynamicAllocation.enabled": "false",
          "spark.local.dir": "/local/scratch"
        }
      }
    ],
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {
        "logUri": "s3://$S3_TEST_BUCKET/elasticmapreduce/emr-containers"
      }
    }
  }
}

An EMR job run can be started as follows:


$ export S3_TEST_BUCKET=xonai
$ export AWS_REGION=us-east-1 
$ export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
$ export VIRTUAL_ID=$(aws emr-containers list-virtual-clusters --query "virtualClusters[?name == 'emr-test-cluster' && state == 'RUNNING'].id" --output text)
$ export EMR_IMAGE_URI=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/emr_6.5_tpch:latest

$ curl https://raw.githubusercontent.com/xonai-computing/xonai-benchmarks/main/k8s/emr-job.json | envsubst > emr-job.json
# Modify the program arguments in lines 10 to 14 if required
$ aws emr-containers start-job-run --cli-input-json file://./emr-job.json

Similar to the execution of the oss Spark job, six pods in total get scheduled after initiating the job run:


$ kubectl get pods -n emr
NAME                                                 READY   STATUS    RESTARTS   AGE
000000030rb1lh9ikmj-cf268                            3/3     Running   0          2m47s
spark-000000030rb1lh9ikmj-driver                     2/2     Running   0          2m35s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-1   2/2     Running   0          2m3s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-2   2/2     Running   0          2m3s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-3   2/2     Running   0          2m2s
tpc-cloud-benchmark-100-gb-4d51e8836b0e1372-exec-4   2/2     Running   0          2m2s

Clicking on the emr-test-cluster link in the EMR console’s Virtual Clusters window opens a job run view. The recently launched job should be listed in a running state:

After the application completes, all associated pods get deleted almost immediately which is a difference to the oss counterpart in 4.2. The final status of the job run is displayed on the EMR console and can be fetched from the command-line:


$ kubectl get pods -n emr
No resources found in emr namespace.
$ aws emr-containers list-job-runs --virtual-cluster-id $VIRTUAL_ID
{
    "jobRuns": [
        {
            "id": "000000030rb1lh9ikmj",
            "name": "emr-tpch-job",
            "virtualClusterId": "2jf0w2hwzpia1f6yb5hyog9oa",
            "arn": "arn:aws:emr-containers:us-east-1:000000000000:/virtualclusters/2jf0w2hwzpia1f6yb5hyog9oa/jobruns/000000030rb1lh9ikmj",
            "state": "COMPLETED",
            "executionRoleArn": "arn:aws:iam::000000000000:role/emr-test-role",
            "releaseLabel": "emr-6.5.0-latest",
            "createdAt": "2022-09-23T15:53:02+00:00",
            "finishedAt": "2022-09-23T16:12:19+00:00",
            "stateDetails": "JobRun completed successfully. It ran for 18 Minutes 45 Seconds",
            "tags": {}
        },
[...]

5.3 Cleaning up

After all applications have completed, the virtual cluster and underlying EKS cluster should be deleted:


$ aws emr-containers delete-virtual-cluster --id $VIRTUAL_ID
$ eksctl delete cluster -n eks-test-cluster

For the deletion of a virtual cluster, operations on system resources are not required, so the first command completes almost instantly. The termination of the physical EKS cluster involves the deprovisioning of nodes and the deletion of CloudFormation stacks which can take a few minutes.

Sign up to our newsletter so you can be the first to find out the latest news.