There are many challenges in large-scale machine learning, here are some main ones:

  1. Data management: Handling and processing large volumes of data efficiently is a significant challenge. This includes data storage, retrieval, and preprocessing. How to get the features fro the raw data and feed them to the model.
  2. Model complexity: As the size of the data increases, the complexity of the model may also need to increase to capture the underlying patterns. nowadays, models can go up to hundrens of billions parameters. This can lead to longer training times and increased computational resources.
  3. mesurement and evaluation: Evaluating the performance of large-scale machine learning models can be challenging, especially when dealing with imbalanced datasets or when the model is deployed in a real-world setting. It is important to have robust evaluation metrics and techniques to ensure that the model is performing well and contribute to the bussiness value.
  4. Scalability: Ensuring that the machine learning system can scale effectively during the inference phase is crucial. This involves optimizing the model for efficient inference, managing resources effectively, and ensuring that the system can handle a large number of requests without significant latency.

Data Ingestion

Streaming Ingestion: This involves continuously collecting and processing data in real-time. It is essential for applications that require up-to-date information, such as recommendation systems or fraud detection. A classic architecture on aws is like that:

flowchart TD
    subgraph SRC["Event sources"]
        A[Web / app]
        B[Backend services]
        C[IoT / sensors]
    end

    subgraph INGEST["Ingestion — AWS Kinesis"]
        D[Kinesis Data Streams]
        E[Kinesis Firehose]
    end

    subgraph STORE["Storage — source of truth"]
        F[(S3 raw data lake)]
    end

    subgraph FEAT["Feature engineering"]
        G[Glue ETL]
        H[Athena]
        I[Data Wrangler]
    end

    subgraph FS["Feature store"]
        J[SageMaker Feature Store]
    end

    subgraph TRAIN["Training"]
        K[SageMaker Training]
        L[SM Pipelines]
        M[SM Experiments]
    end

    subgraph DEPLOY["Deployment"]
        N[Model Registry]
        O[Real-time endpoint / Batch Transform]
    end

    subgraph OBS["Observability — read only"]
        P[OpenSearch]
        Q[Kibana dashboards]
    end

    A --> D
    B --> D
    C --> E
    D --> F
    E --> F
    D -. read only .-> P
    P --> Q
    F --> G
    F --> H
    F --> I
    G --> J
    H --> J
    I --> J
    J --> K
    J --> L
    J --> M
    K --> N
    L --> N
    M --> N
    N --> O
    O -. feedback loop .-> F

There are basically three types of streaming ingestion:

Clickstream Ingestion

The are the high frequency events generated by users interacting with a website, application voice assitant. This data can be used for various purposes, such as analyzing user behavior, personalizing content, and improving user experience. The frequence can be potentially millions per second. We often capture the informatio such as:

  • Timestamp: The time at which the event occurred.
  • User agent: Information about the user’s device and browser.
  • IP address: The user’s IP address, which can provide insights into their location and network.
  • User ID: A unique identifier if the user is logged in.
  • request details: how the user used the endpoints.
  • Session ID (cookiesm helps tie clicks stream entries)
  • Referrer: how the user arrived at the website (e.g., search engine, social media, direct link).

Tools for clickstream ingestion include Apache Kafka, Amazon Kinesis, and Google Cloud Pub/Sub. These tools can handle high volumes of data and provide real-time processing capabilities.

flowchart LR
    subgraph Producers
        P1[Clickstream Logs]
        P2[Clickstream Logs]
        P3[Clickstream Logs]
    end

    subgraph Kafka or Kinesis
        B[Broker Cluster]
    end

    subgraph Consumers
        C1[Storage]
        C2[Storage]
        C3[Storage]
    end

    P1 --> B
    P2 --> B
    P3 --> B

    B --> C1
    B --> C2
    B --> C3

In the broker cluster, we often use partitioning to distribute the data across multiple brokers. We have also a zookeeper, the coordination service that manages cluster metadata and leadership to manage the state of the brokers and ensure high availability. In modern Kafka deployments, ZooKeeper is replaced by KRaft mode, where Kafka manages its own metadata using Raft consensus.

Change Data Capture (CDC)

We assume that we have a database and it track who is subscribed to Netflix. We have:

  • User ID: A unique identifier for each user.
  • is_active_subscribed: A boolean indicating whether the user is currently subscribed.
  • renewal_date: The date when the user’s subscription is set to renew.

We want to do modeling whether someone will renew or not and in this case we care more about the current state and the previous state of the user for example, when did a user cancel, the user should be subscribed before and now he is not.

We need to use log of changes to row in a particular table, and sends changes logs to streaming tool like Kafka, Kinesis, Pub/Sub. Then we can use the change logs to update the features in real time and feed them to the model for inference. This is important for applications that require up-to-date information, such as recommendation systems or fraud detection.

So there comes our producer, broker and consumer architecture, where the producer is the database and when the column is_active_subscribed changes, the related data of the user will be sent to the broker cluster, and then the consumer will store the data for creating a model to predict whether the user will renew or not.

flowchart LR
    subgraph Producers
        P[Online Transactional Processing OLTP Database]
    end

    subgraph Broker
        B[Broker Cluster]
    end

    subgraph Consumers
        C[Online Analytical Processing OLAP Database]
    end

    P --> B
    B --> C

Live Video Ingestion

We ingest video content from traffic cameras, security cameras, video streaming service. An example is HTTP live stream, which is a protocol for streaming video content over the internet. It uses H.264 compression, AAC for sounds and effectively chops MPHs up into segments sends them out over HTTP.

It can also be fitted into the producer, broker and consumer architecture, where the producer is the video source, the broker cluster is responsible for handling the video stream and distributing it to consumers, and the consumers are responsible for processing and analyzing the video data.

Ingestion cosiderations

  • Size of individual data: clickstream vs video stream
  • Rate at which data comes in: subscriptions vs clickstream
  • Support of data types (changing data types)
  • High-availability (multi-AZ) and fault tolerance (replication, retry mechanism)

Data Storage

Distributed File Systems

HDFS (Hadoop Distributed File System) is a distributed file system that provides high-throughput access to application data. It is designed to store and manage large volumes of data across a cluster of commodity hardware. HDFS is a key component of the Hadoop ecosystem and is widely used for big data storage and processing.

It is composed of two main components: the NameNode and the DataNodes. The NameNode is responsible for managing the metadata of the file system, while the DataNodes are responsible for storing the actual data. HDFS provides fault tolerance by replicating data across multiple DataNodes, ensuring that data is not lost in case of hardware failures.

For a big file, client wil partition it into blocks (default 128MB) and store them across different DataNodes. The NameNode keeps track of the location of each block and manages the file system namespace.

For the availability, there are two NameNodes, one is active and the other is standby. The active NameNode handles all client requests, while the standby NameNode is ready to take over in case of failure, it has exactly the same metadata as the active NameNode. DataNodes are also replicated across multiple nodes to ensure data durability and availability. The ZooKeeper cluster will monitor the health of the NameNodes and manage the failover process if the active NameNode fails.

Hadoop does not replicate the entire data, it uses erasure coding instead of simple replication to provide fault tolerance. It is like XOR operation, where the data is divided into blocks and encoded with parity information. It is uing Reed-Soloman encoding, which allows for data recovery even if some blocks are lost. This approach is more storage-efficient than simple replication, as it reduces the amount of redundant data stored while still providing fault tolerance.

Data completeness and consistency

How to make sure that all data provided by producer are stored in the consumer side. Normally the producer send a message to the broker cluster and the broker cluster will send an acknowledgment back to the producer once the message is successfully stored. The producer can then use this acknowledgment to confirm that the data has been successfully ingested and stored in the consumer side.

If the producer does not receive an acknowledgment within a certain time frame, due to the internet connection or broker failure, it can retry sending the message until it receives a confirmation of successful storage. This mechanism helps ensure data completeness and reliability in the ingestion process.

But if the connection failed after the broker received the message but before it sent the acknowledgment, the producer may retry sending the same message, which can lead to duplicate data in the consumer side. If the consumer are dealing with money transactions, this can lead to serious issues. To handle this, we can use idempotent producers, which assign a unique identifier to each message. The broker can then use this identifier to detect and discard duplicate messages, ensuring that only one copy of each message is stored in the consumer side.

Kafka Transaction uses unique transactional IDs for each producer tried to metadata to ensure that each data committed to the broker is complete. Kafka Stream API simplifies code. HDFS sink connector can be used to write data from broker cluster to the consumer side, and it also supports exactly-once semantics to ensure data consistency.

Storage formats

Avro

Avro is a row-based storage format that is compact, fast, and suitable for serialization.

  • Row oriented storage: Avro stores data in a row-oriented format, which means that all the fields of a record are stored together.
  • Good for queries which need all columns.
  • Good for heavy write loads, as it is optimized for fast serialization and deserialization.
  • JSON schema supports evolution, which means that you can add new fields to the schema without breaking existing data. This is particularly useful in streaming applications where the data schema may evolve over time.

Parquet

  • Column-oriented storage: Parquet stores data in a column-oriented format, which means that all the values of a particular field are stored together.
  • Good for queries which need some columns.
  • Good for heavy read load.
  • Schema challenged with evolution, which means that adding new fields to the schema can be more complex and may require additional handling to ensure compatibility with existing data. This can be a consideration in streaming applications where the data schema may evolve over time.
  • Good for sparse data, as it can efficiently store and compress data with many null values. This is particularly beneficial in scenarios where the dataset contains a large number of optional fields or when the data is not uniformly distributed across all columns.
  • Good compression performance.

Data Processing

Once we collected the data, we need to process it to extract features and feed them to the model for training and inference. That includes:

  • Aggregation: This involves summarizing and combining data to create meaningful features. For example, calculating the average time spent on a website or the total number of purchases made by a user.
  • Joins: This involves combining data from different sources to create a more comprehensive dataset. For example, joining user demographic data with clickstream data to create features for a recommendation system.
  • Transformations: We need only Nth month of year instead of the date, we can do transformation to extract the month from the date. We can also do normalization, standardization, encoding categorical variables, handling missing values, etc.

But this process wih large-scale data can be computationally intensive and time-consuming. To address this, we can use distributed processing frameworks. we need:

  1. Cluster resource management: This involves managing the resources of the cluster, such as CPU, memory, and storage.
  2. Computational dependency management: This involves managing the dependencies between different processing tasks, ensuring that they are executed in the correct order.
  3. Manage saving final results to HDFS or other storage systems.
  4. It is better to share the same HDFS cluster so we do not need to transfer data between different clusters.

Apache YARN

Apache YARN (Yet Another Resource Negotiator) is a cluster management technology that is part of the Hadoop ecosystem. It is responsible for managing and scheduling resources across a cluster of machines, allowing multiple applications to share the same cluster resources efficiently.

When using Spark with YARN, Spark can run on top of YARN, which provides resource management and scheduling capabilities. Spark can be configured to use YARN as its cluster manager, allowing it to take advantage of YARN’s features for resource allocation and job scheduling.

  • Resource Manager: It is responsible for scheduler allocates cluster resources

    • Scheduler: Allocates cluster resources.
    • Application Manager: Accepts jobs to be run on a cluster.
  • Node Manager (Per-node): It is responsible for managing the resources on individual nodes in the cluster and monitoring their health.

    • It negotiates with Resource Manager for resources requested by application manager.
    • It reports resource usage to the resource manager.
  • Application Master: It negotiates with the Scheduler for containers.

  • Containers: An abstraction representing a collection of resources (CPU, memory, etc.) on a single node in the cluster.

Apache Spark

Apache Spark is a distributed data processing framework that provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. It is designed to be fast and general-purpose, making it suitable for a wide range of data processing tasks, including batch processing, stream processing, machine learning, and graph processing.

  • Driver: The driver program is the main entry point for a Spark application. It is responsible for orchestrating the execution of the application and managing the overall lifecycle of the Spark job.
    • Convert user’s code to a set of tasks (unit of work)
    • Schedule tasks across excecutor nodes.
  • Cluster Manager: YARN
  • Excutor: Runs Tasks, Returns result to Driver.
flowchart TB
    CLIENT(["Client\n(spark-submit)"])

    subgraph RM["Resource Manager (YARN Master)"]
        SCHED["Scheduler\n(capacity / fair)"]
        APPMGR["Applications Manager\n(accepts jobs, tracks AMs)"]
    end

    subgraph NM1["Node Manager — Worker Node 1"]
        direction TB
        NM1_SVC["NodeManager Service\n• heartbeat to RM\n• report CPU / MEM\n• launch containers"]
        subgraph C1["Container — Spark ApplicationMaster"]
            AM["ApplicationMaster\n+ Spark Driver\n• DAG → stages → tasks\n• negotiate executors"]
        end
        subgraph C2["Container — Executor 1"]
            T1["Task 1"]
            T2["Task 2"]
            CACHE1[("Block\nCache")]
        end
        subgraph C3["Container — Executor 2"]
            T3["Task 3"]
            T4["Task 4"]
            CACHE2[("Block\nCache")]
        end
    end

    subgraph NM2["Node Manager — Worker Node 2"]
        direction TB
        NM2_SVC["NodeManager Service\n• heartbeat to RM\n• report CPU / MEM\n• launch containers"]
        subgraph C4["Container — Executor 3"]
            T5["Task 5"]
            T6["Task 6"]
            CACHE3[("Block\nCache")]
        end
        subgraph C5["Container — Executor 4"]
            T7["Task 7"]
            T8["Task 8"]
            CACHE4[("Block\nCache")]
        end
    end

    HDFS[("HDFS / S3\n(data splits)")]

    %% ── Job submission ──────────────────────────────────────────
    CLIENT -- "1 · submit app" --> APPMGR
    APPMGR -- "2 · allocate AM container" --> SCHED
    SCHED -- "3 · launch AM on node 1" --> NM1_SVC
    NM1_SVC -- "4 · start container" --> AM

    %% ── Executor negotiation ────────────────────────────────────
    AM -- "5 · request executor containers" --> SCHED
    SCHED -- "6 · grant containers on NM1" --> NM1_SVC
    SCHED -- "6 · grant containers on NM2" --> NM2_SVC
    NM1_SVC -- "7 · launch" --> C2
    NM1_SVC -- "7 · launch" --> C3
    NM2_SVC -- "7 · launch" --> C4
    NM2_SVC -- "7 · launch" --> C5

    %% ── Task dispatch ───────────────────────────────────────────
    AM -- "8 · dispatch tasks" --> T1
    AM -- "8 · dispatch tasks" --> T3
    AM -- "8 · dispatch tasks" --> T5
    AM -- "8 · dispatch tasks" --> T7

    %% ── Data reads ──────────────────────────────────────────────
    HDFS -- "data split" --> T1
    HDFS -- "data split" --> T3
    HDFS -- "data split" --> T5
    HDFS -- "data split" --> T7

    %% ── Node Manager heartbeats ─────────────────────────────────
    NM1_SVC -- "heartbeat / resource report" --> RM
    NM2_SVC -- "heartbeat / resource report" --> RM

    %% ── Task results ────────────────────────────────────────────
    T2 -- "result" --> AM
    T4 -- "result" --> AM
    T6 -- "result" --> AM
    T8 -- "result" --> AM

    %% ── Styling ─────────────────────────────────────────────────
    classDef rmBox   fill:#dbeafe,stroke:#3b82f6,color:#1e3a5f
    classDef nmBox   fill:#dcfce7,stroke:#22c55e,color:#14532d
    classDef exec    fill:#fef9c3,stroke:#eab308,color:#713f12
    classDef driver  fill:#ede9fe,stroke:#8b5cf6,color:#3b0764
    classDef store   fill:#fee2e2,stroke:#ef4444,color:#7f1d1d

    class RM,SCHED,APPMGR rmBox
    class NM1,NM2,NM1_SVC,NM2_SVC nmBox
    class C2,C3,C4,C5,T1,T2,T3,T4,T5,T6,T7,T8 exec
    class C1,AM driver
    class HDFS store

Processing Orchestration