EMR serverless
EMR (Elastic Map Reduce) Serverless is a serverless big data processing service. We can chose an EMR Release and Runtime (Spark, Hive, Presto, etc.) and submit jobs without having to manage any infrastructure. EMR Serverless automatically provisions and scales the compute resources needed to run the jobs and works, and we can also specify the amount of resources we want to allocate for each job. Clusters will be in one region but across availability zones for high availability. We need to still configure worker nodes.
flowchart LR
IAM["IAM user"]
CLI["AWS CLI(for now)"]
ROLE["Job execution role"]
APP["EMR Serverless Application\n(Spark, Hive, etc.)"]
JOB["EMR Job\n(Spark script,\nHive query...)"]
IAM --> CLI
CLI --> APP
ROLE --> APP
JOB --> APP
%% “Notes” as separate nodes; use <br/> for line breaks and keep text on one line
JOB -.-> JOB_NOTE["aws emr-serverless start-job-run<br/>--application-id <application_id><br/>--execution-role-arn <execution_role_arn><br/>--job-driver ..."]
ROLE -.-> ROLE_NOTE["Allow emr-serverless.amazonaws.com service<br/>S3 access for scripts & data<br/>Glue access (for SparkSQL)<br/>KMS keys as needed"]
classDef note fill:#fff5ad,stroke:#999,color:#000,font-size:11px;
class JOB_NOTE,ROLE_NOTE note;
flow:
- Driver reads your code and creates a logical plan.
- Driver breaks the plan into stages and tasks.
- Executors run the tasks in parallel on worker nodes.
- Executors store intermediate data in memory/disk if needed.
- Executors send results back to the driver.
- Driver aggregates results and completes the job.
Pre-Initialized capacity
For Spark, we add 10% overhead to memory request for drivers adn excutors. we need to make sure that the initial capacity is at least 10% more than requested by the job. Example of creating an EMR Serverless application with Spark runtime and specifying the initial capacity for driver and executor workers:
aws emr-serverless create-application \
--type "SPARK" \
--name <"my_application_name"> \
--release-label "emr-6.5.0-preview" \
--initial-capacity '{
"DRIVER": {
"workerCount": 5,
"resourceConfiguration": {
"cpu": "2vCPU",
"memory": "4GB"
}
},
"EXECUTOR": {
"workerCount": 50,
"resourceConfiguration": {
"cpu": "4vCPU",
"memory": "8GB"
}
}
}' \
--maximum-capacity '{
"cpu": "400vCPU",
"memory": "1024GB"
}'
EMR on EKS
Elastic kubernates service, we can use it to run our own kubernates cluster and run our jobs on it.
We can have EMR on EKS, which allows us to run EMR jobs on EKS cluster. Which allows submitting Spark job on Elastic Kubernetes Service without provisioning clusters.
It is fully managed and it allows sharing resources between Spark and other apps on Kubernetes.
Spark
HDFS is a distributed file system designed to run on commodity hardware. Yarn (yet another resource negociator) is a resource management layer for Hadoop that allows multiple applications to share resources in a cluster. MapReduce is a programming model for processing large data sets with a distributed algorithm on a cluster. Spark uses map-reduce but it is more efficient than MapReduce because it can keep data in memory and it has a more flexible programming model.
flowchart TB
HDFS[HDFS]
YARN[YARN]
MR[MapReduce]
SP[Spark]
MR --> YARN
SP --> YARN
YARN --> HDFS
spark components: Speak Streaming, Spark SQL, Spark MLlib, Spark GraphX, Spark RDD (Resilient Distributed Dataset), Spark Core.
flowchart LR
DRIVER["Driver Program\n(Spark Context)"]
CM["Cluster Manager\n(Spark, YARN)"]
EX1["Executor - Cache - Tasks"]
EX2["Executor - Cache - Tasks"]
EX3["Executor - Cache - Tasks"]
DRIVER --> CM
CM --> EX1
CM --> EX2
CM --> EX3
DRIVER <--> EX1
DRIVER <--> EX2
DRIVER <--> EX3
EX1 <--> EX2
EX2 <--> EX3
Spark MLLib
- Classification: logistic regression, naive bayes.
- Rgression
- Decision trees
- Recommemdation engine (ALS)
- Clustering (K-means, Gaussian Mixture)
- LDA (K-Means)
- ML workflow (Pipelines, Cross-validation, Hyperparameter tuning)
- SVD, PCA, statistics
Spark Structured Streaming
Data steams as an unbounded table, we can use SQL to query the data stream. New data is new rows appended to inut table.
Spark Streaming + Kinesis
It can get data from Kinesis stream and process it in real-time. We can use Spark Structured Streaming to read data from Kinesis, perform transformations, and write the results to a sink (e.g., S3, database, etc.). This allows us to build real-time data processing pipelines that can handle high volumes of streaming data.
Zeppelin + spark
It can run spark code interactively (like you can in the spark shell), it can exceute SQL queries against Spark SQL, and it can also visualize the results of the queries. Query results may be visualized in charts and graphs.
Features engineering
Feature engineering is appling our knowledge of the data and the model that we are using to create better features to train your model with. We beed to consider which feature should we use, how to handle missing data, do we need to transform these features in some way? should we create new features from the existing ones?
“Applied machine learning is basically feature engineering” - Andrew Ng. We can’t just throw in raw data and expect good results.
The Curse of Dimensionality
Too many feature features can be a problem and with dimension increase, the volume of the space increases exponentially, and the data becomes sparse.
Each feature is a new dimension. We usually use domain knowledge to select features, and we can also use techniques like PCA (Principal Component Analysis). K-eams to reduce the dimensionality of the data while retaining as much variance as possible. We can also use feature selection techniques to select a subset of the most relevant features for our model.
Preparing Data for TF-IDF on Spark and EMR
TF: Term Frequency - how many times a term appears in a document. It is often normalized by the total number of terms in the document to prevent bias towards longer documents. A word that occurs frequently is probably important to that document.
IDF: Inverse Document Frequency - measures how important a term is in the entire corpus. It is calculated as the logarithm (since word frequencies are distributed exponentially and that gives use better weighting of a words overall popularity) of the total number of documents divided by the number of documents containing the term. The idea is that terms that appear in many documents are less informative than those that appear in fewer documents. The words like “the”, “is”, “and” will have high term frequency but they are not important for the meaning of the document, so they will have low IDF. On the other hand, a word like “machine learning” may have a lower term frequency but it is more informative, so it will have a higher IDF.
TF-IDF is the product of TF and IDF, it gives us a weight for each term in each document, which can be used as features for machine learning models. It helps to identify the most important terms in a document and can be used for tasks like text classification, clustering, and information retrieval. TF-IDF assumes a document is just a “bag of words”. Words can be represneted as a has value fir efficiency. Doing this at scale is the hard part and that’s where Spark comes in.
\mathrm{tfidf}(t, d, D) = \mathrm{tf}(t, d) \cdot \mathrm{idf}(t, D)
\mathrm{tf}(t,d) =
\begin{cases}
1 + \log f_{t,d}, & \text{if } f_{t,d} > 0 \\
0, & \text{if } f_{t,d} = 0
\end{cases}
\mathrm{idf}(t, D)
= \log\!\left(\frac{1 + N}{1 + n_t}\right) + 1
\mathrm{idf}_{\text{prob}}(t, D)
= \log\!\left(\frac{N - n_t}{n_t}\right)
Unigram, bigram, n-gram
An extension of TF-IDF is to not only compute relevency for individual words (unigrams) but also for pairs of words (bigrams) or even longer sequences of words (n-grams). This can capture more context and meaning from the text, as certain phrases may be more informative than individual words. For example, “machine learning” as a bigram may be more informative than “machine” and “learning” as separate unigrams. However, using n-grams can also increase the dimensionality of the feature space, so it’s important to balance the benefits with the computational cost.
A very simple search algorithm could be:
- Compute TF-IDF for every word in a corpus.
- For a given search word, sort the documents by their TF-IDF score for the word.
- Display the result.