Real-Time Streaming Analytics Pipeline with Kinesis, Flink, and OpenSearch πŸš€

Β·

4 min read

Real-Time Streaming Analytics Pipeline with Kinesis, Flink, and OpenSearch πŸš€

Hi there πŸ‘‹, I'm Tanishka Marrott

Today, I'll be taking you through an exciting journey where we've seamlessly integrated the primary phases of data ingestion, processing, storage, and visualization into a single cohesive pipeline using AWS services like Kinesis, Lambda, Glue, and OpenSearch.

Key Focus:

➑️ We've optimized for non-functional aspects like scalability and performance throughout.

Project Workflow

Here's an overview of our Streaming Analytics pipeline:
Link to the repo :- Real-time Streaming Analytics with Kinesis Repo

Data Ingestion Layer

We've utilized Kinesis Data Streams for capturing and storing real-time streaming data. For the implementation details, please check out the file --> EnhancedProducer.java.

How does the Kinesis producer workflow look like? πŸ€”

  1. Initialize the Producer Configuration:

    • Set crucial parameters like timeouts and max connections to optimize Kinesis' performance.
    KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion(REGION)
        .setRecordTtl(30000);
  1. Instantiate the Producer Instance:

    • With all necessary configurations specified, we'll create the producer instance.
    try (KinesisProducer producer = new KinesisProducer(config)) {
        // Producer logic here
    }
  1. Read Data from the Telemetry CSV File:

    • Standardize the format, making it suitable for streaming.
    List<Trip> trips = TripReader.readFile(new File("data/taxi-trips.csv"));
  1. Set Up ExecutorService:

    • Manage multiple threads for increased concurrency and throughput.
    ExecutorService executor = Executors.newFixedThreadPool(coreCount * 2);
  1. Asynchronous Data Ingestion:

    • Utilize CompletableFuture for making the data ingestion process fully asynchronous.
    trips.forEach(trip -> CompletableFuture.runAsync(() -> sendRecord(producer, trip, 0, 100), executor));
  1. Data Integrity and Reliability:

    • Check responses, log successful shard IDs, and capture error messages for failed ones.
    private static void sendRecord(KinesisProducer producer, Trip trip, int attemptCount, long backoff) {
        // Send record logic here
    }
  1. Graceful Shutdown:

    • Ensure all tasks are completed by properly shutting down the Executor Service and Kinesis Producer.
    executor.shutdown();
    producer.flushSync();
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        executor.shutdownNow();
    }

Design Decisions for the Ingestion Layer πŸ› οΈ

A β†’ On-Demand Capacity Mode for KDS:

πŸ“ Our data stream scales automatically with workload variations, so no manual handling of shard capacities is needed. It automatically scales based on the influx of streaming data πŸ‘.

B β†’ Optimized Thread Management:

Initially used solely ExecutorService but faced blocking issues with Future.get(). Transitioned to CompletableFuture for a fully asynchronous workflow, improving efficiency and throughput.

C β†’ Dynamically Sized Thread Pool:

Used a factor of 2 * number of CPU cores to handle both CPU-bound and I/O-bound threads effectively. This ensures resource efficiency and performance optimization.

D β†’ Retry + Progressive Backoff Mechanism:

Implemented to handle errors gracefully, ensuring operational stability and service continuity. This approach minimizes system workload, improves data consistency, and defines a predictable retry policy.

Data Transformation Layer for Workflow #1:

Services Utilized: Kinesis Data Firehose + Glue

Why Firehose and Glue?

  • Glue: Acts as a central metadata repository through the data catalog, enhancing Athena's querying capabilities.

  • Firehose: Loads streaming data into S3, referencing the schema definitions stored in Glue.

Data Transformations Using Lambda:

  1. Lightweight processing logic to avoid bottlenecks and high memory usage.

  2. Offloaded complex data processing to Flink in KDA for efficiency.

Non-Functional Enhancements:

  • Buffer Size and Interval Configurations: Trade-off between latency and cost.

  • Data Compression and Formats: Used Snappy for compression and Parquet for columnar storage to reduce data size and improve query performance.

  • Error Handling: Segregated error outputs in S3 for faster troubleshooting and recovery.

Stream Processing & Visualization

Key Service: Kinesis Data Analytics (KDA)

Workflow #2:

  • Streaming data is processed by a Flink SQL application deployed on KDA Studio.

  • Processed data is sent to OpenSearch for visualization and analytics of historic data.

Why Flink over SQL?

  • Flink excels at real-time insights and stateful computations on streaming data.

  • OpenSearch complements Flink by providing search and analytics capabilities for historic data.

Optimizing Non-Functional Aspects:

  1. KDA Parallelism: Auto-scales based on workload requirements, ensuring efficient resource utilization.

  2. Logging and Monitoring: Implemented for troubleshooting and recovery.

  3. Checkpointing: Configured for data recovery and consistency.

  4. Cost Alerts and IAM Policies: Ensured budget considerations and access security.

Conclusion:

Thank you for accompanying me on this journey! Here’s a quick recap of what we built today:

Workflow - 1: Data Ingestion to Storage

  • Ingested data via Kinesis Data Streams

  • Transferred to S3 via Firehose

  • Managed schema in Glue

  • Enriched and standardized data via Lambda

  • Stored in S3

Workflow - 2: Stream Processing and Visualization

  • Flink application on KDA Studio for real-time processing

  • Aggregated data

  • S3 as durable data store

  • Visualized data with OpenSearch

Acknowledgements: Special thanks to AWS Workshops for their resources and support in this project's development.

About: This project focuses on real-time data streaming with Kinesis, using Flink for advanced processing, and OpenSearch for analytics. This architecture handles the complete lifecycle of data from ingestion to actionable insights, making it a comprehensive solution.

Feel free to reach out with any questions or feedback! Let's connect on LinkedIn and check out my GitHub for more projects.


Topics

  • OpenSearch

  • Data Engineering

  • Cloud Computing

  • AWS Lambda

  • Kinesis Data Streams

  • Apache Flink

  • AWS Glue

  • Real-Time Analytics

Happy Coding! πŸš€

Β