Usage with Apache Spark on YARN¶
conda-pack
can be used to distribute conda environments to be used with
Apache Spark jobs when deploying on Apache YARN. By bundling your
environment for use with Spark, you can make use of all the libraries provided
by conda
, and ensure that they’re consistently provided on every node. This
makes use of YARN’s
resource localization by distributing environments as archives, which are then
automatically unarchived on every node. In this case either the tar.gz
or
zip
formats must be used.
Python Example¶
Create an environment:
$ conda create -y -n example python=3.5 numpy pandas scikit-learn
Activate the environment:
$ conda activate example # Older conda versions use `source activate` instead
Package the environment into a tar.gz
archive:
$ conda pack -o environment.tar.gz
Collecting packages...
Packing environment at '/Users/jcrist/anaconda/envs/example' to 'environment.tar.gz'
[########################################] | 100% Completed | 23.2s
Write a PySpark script, for example:
# script.py
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setAppName('spark-yarn')
sc = SparkContext(conf=conf)
def some_function(x):
# Packages are imported and available from your bundled environment.
import sklearn
import pandas
import numpy as np
# Use the libraries to do work
return np.sin(x)**2 + 2
rdd = (sc.parallelize(range(1000))
.map(some_function)
.take(10))
print(rdd)
Submit the job to Spark using spark-submit
. In YARN cluster mode:
$ PYSPARK_PYTHON=./environment/bin/python \
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn \
--deploy-mode cluster \
--archives environment.tar.gz#environment \
script.py
Or in YARN client mode:
$ PYSPARK_DRIVER_PYTHON=`which python` \
PYSPARK_PYTHON=./environment/bin/python \
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn \
--deploy-mode client \
--archives environment.tar.gz#environment \
script.py
You can also start a PySpark interactive session using the following:
$ PYSPARK_DRIVER_PYTHON=`which python` \
PYSPARK_PYTHON=./environment/bin/python \
pyspark \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn \
--deploy-mode client \
--archives environment.tar.gz#environment
R Example¶
Conda also supports R environments. Here we’ll demonstrate creating and packaging an environment for use with Sparklyr. Note that similar techniques also work with SparkR.
First, create an environment:
$ conda create -y -n example r-sparklyr
Activate the environment:
$ conda activate example # Older conda versions use `source activate` instead
Package the environment into a tar.gz
archive. Note the addition of the
-d ./environment
flag. This tells conda-pack
to rewrite the any
prefixes to the path ./environment
(the relative path to the environment
from the working directory on the YARN workers) before packaging. This is
required for R, as the R executables have absolute paths hardcoded in them
(whereas Python does not).
$ conda pack -o environment.tar.gz -d ./environment
Collecting packages...
Packing environment at '/Users/jcrist/anaconda/envs/example' to 'environment.tar.gz'
[########################################] | 100% Completed | 21.8s
Write an R script, for example:
library(sparklyr)
# Create a spark configuration
config <- spark_config()
# Specify that the packaged environment should be distributed
# and unpacked to the directory "environment"
config$spark.yarn.dist.archives <- "environment.tar.gz#environment"
# Specify the R command to use, as well as various R locations on the workers
config$spark.r.command <- "./environment/bin/Rscript"
config$sparklyr.apply.env.R_HOME <- "./environment/lib/R"
config$sparklyr.apply.env.RHOME <- "./environment"
config$sparklyr.apply.env.R_SHARE_DIR <- "./environment/lib/R/share"
config$sparklyr.apply.env.R_INCLUDE_DIR <- "./environment/lib/R/include"
# Create a spark context.
# You can also specify `master = "yarn-cluster"` for cluster mode.
sc <- spark_connect(master = "yarn-client", config = config)
# Use a user defined function, which requires a working R environment on
# every worker node. Since all R packages already exist on every node, we
# pass in ``packages = FALSE`` to avoid redistributing them.
sdf_copy_to(sc, iris) %>%
spark_apply(function(e) broom::tidy(lm(Petal_Length ~ Petal_Width, e)),
packages = FALSE)
Run the script.
$ Rscript script.R
# Source: table<sparklyr_tmp_12de794b4e2a> [?? x 5]
# Database: spark_connection
Sepal_Length Sepal_Width Petal_Length Petal_Width Species
<chr> <dbl> <dbl> <dbl> <dbl>
1 (Intercept) 1.08 0.0730 14.8 4.04e-31
2 Petal_Width 2.23 0.0514 43.4 4.68e-86