ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kinesis
    Data Engineering 2019. 9. 20. 00:42

    Kinesis Data Stream

    • Real-time Data Stream
    • Retention between 1 day to 365 days
    • Ability to reprocess (replay) data
    • Once data is inserted in Kinesis, it can’t be deleted (immutability)
    • Data that share the same partition goes to the same shard (ordering)
    • Producers: AWS SDK, Kinesis Producer Library (KPL), Kinesis Agent
    • Consumers
      • Write your own: Kinesis Client Library (KCL), AWS SDK
      • Managed: AWS Lambda, Kinesis Data Firehose, Kinesis Data Analytics

    Capacity Modes

    Provisioned mode

    • You choose the number of shards provisioned, scale manually or use API
    • Each shard gets 1 MB/s in (or 1000 records per second)
    • Each shard gets 2 MB/s out (classic or enhanced fan-out consumer)
    • You pay per shard provisioned per hour

    On-demand mode

    • No need to provision or manage the capacity
    • Default capacity provisioned (4 MB/s in or 4000 records per second)
    • Scales are automatically based on observed throughput peaks during the last 30 days
    • Pay per stream per hour & data in/out per GB

    Security

    • Control access/authorization using IAM policies
    • Encryption in flight using HTTPS endpoints
    • Encryption at rest using KMS
    • You can implement encryption/decryption of data on the client-side (harder)
    • VPC Endpoints are available for Kinesis to access within VPC
    • Monitor API calls using CloudTrail

    Architecture

    Producer

    Producers send data into Kinesis Data Stream

    Can be in many forms: Applications, multi-platform clients, AWS SDK, Kinesis Producer Library (KPL), Kinesis Agent, Apache Spark, Log4j Appenders, Flume, Kafka Connect, NiFi, and so on.

    Record

    Partition key: Define and help determine in which shard will the record go to

    Data Blob: The value itself

    Fan-out

    1 MB/sec or 1000 msg/sec per shard. If you have 6 shards, you get 6 MB/sec or 6000 msg/sec

    Kinesis Data Streams

    Kinesis Data Stream is made of multiple shards

    Shards

    • Shards are numbered
    • Define data stream capacity in terms of ingestion and consumption rates

    Consumer

    Many forms: Applications, SDK, Kinesis Client Libraries, Lambda, Kinesis Data Firehose, Kinesis Data Analysis

    Record

    Partition key: Define and help determine in which shard will the record go to

    Sequence number: Represent where a record was in the shard

    Data Blob: The value itself

    Fan-out

    1. 2 MB/sec (shared) per shard for all consumers
    2. 2 MB/sec (enhanced) Per shard per consumer

    Kinesis Producer SDK - PutRecord(s)

    • APIs that are used PutRecord (one) and PutRecords (many records)
    • PutRecords uses batching and increases throughput → less HTTP requests
    • ProvisionedThroughputExceeded if we go over the limits
      • AWS Mobile SDK: Android, IOS, and etc
    • Use case: low throughput, higher latency, simple API, AWS Lambda
    • Managed AWS sources for Kinesis Data Streams:
      • CloudWatch Logs
      • AWS IoT
      • Kinesis Data Analytics

    AWS Kinesis API - Exceptions

    ProvisionedThroughputExceeded Exceptions

    • Happens when sending more data (exceeding MB/s or TPS for any shard)
    • Make sure you don’t have a hot shard (such as your partition key is bad and too many data goes to that partition)

    Solution

    • Retries with backoff
    • Increase shards (scaling)
    • Ensure your partition key is a good one

    Kinesis Producer Library (KPL)

    • Easy to use and highly configurable C++/Java library
    • Used for building high-performance, long-running producers
    • Automated and configurable retry mechanism
    • Synchronous and Asynchronous API (better performance for async)
    • Submits metrics to CloudWatch for monitoring
    • Batching (both turned on by default) - increase throughput, decrease cost:
      • Collect: Records and Write to multiple shards in the same PutRecords API call
      • Aggregate: increased latency
        • Capability to store multiple records in one record (go over 1000 records per second limit)
        • Increase payload size and improve throughput (maximize 1 MB/s limit)
    • Compression must be implemented by the user
    • KPL Records must be de-coded with KCL or a special helper library

    Batching

    • A record is not going to be sent right away, but aggregate several records into one record
    • Several aggregated records also are collected and sent by one PutRecords
    • We can influence the batching efficiency by introducing some delay with RecordMaxBufferedTime (default 100ms)

    When not to use

    • The KPL can incur an additional processing delay of up to RecordMaxBufferedTime within the library (user-configurable)
    • Larger values for RecordMaxBufferedTime results in higher packing efficiencies and better performance
    • Applications that cannot tolerate this additional delay may need to use the AWS SDK directly

    Kinesis Agent

    • Monitor Log files and sends them to Kinesis Data Streams
    • Java-based agent, built on top of KPL
    • Install in Linux-based server environments

    Features

    • Write from multiple directories and write to multiple streams
    • Routing feature based on directory / log file
    • Pre-process data before sending to streams (single line, csv to json, log to json, and so on)
    • The agent handles file rotation, checkpointing, retry upon failures
    • Emits metrics to CloudWatch for monitoring

    Kinesis Consumer SDK - GetRecords

    • Classic Kinesis - Records are polled by consumers from a shard
    • Each shard has 2 MB total aggregate throughput
    • GetRecords returns up to 10 MB of data (then throttle for 5 seconds) or up to 10000 records
    • Maximum of 5 GetRecords API calls per shard per second = 200 ms latency
    • If 5 consumers application consume from the same shard, means every consumer can poll once a second and receive less than 400 KB/s

    Kinesis Client Library (KCL)

    • Java-first library but exists for other languages too (Golang, Python, Ruby, Node, .Net, and so on)
    • Read records from Kinesis produced with the KPL (de-aggretation)
    • Share multiple shards with multiple consumers in one group, shard discovery
    • Checkpoint feature to resume progress
    • Leverage DynamoDB for coordination and checkpointing (one row per shard)
      • Make sure you provision enough Write Capacity Unit(WCU) / Read Capacity Unit (RCU)
      • Or use On-Demand for DynamoDB
      • Otherwise DynamoDB may slow down KCL
    • Record processors will process the data
    • ExpiredIteratorException → increase WCU

    Kinesis Connector Library

    • Older Java library (2016(, leverage the KCL library
    • Write data to:
      • S3
      • DynamoDB
      • Redshift
      • ElasticSearch
    • Kinesis Firehose replaces the Connector Library for a few of these targets, Lambda for the others

    Lambda sourcing from Kinesis

    • Lambda can source records from Kinesis Data Streams
    • Lambda consumer has a library to de-aggregate record from the KPL
    • Lambda can be used to run lightweight ETL to:
      • S3
      • DynamoDB
      • Redshift
      • ElasticSearch
      • Anywhere you want
    • Lambda can be used to trigger notifications / send emails in real time
    • Lambda has a configurable batch size (more in Lambda section)

    Kinesis Enhanced Fan Out

    • New game-changing feature from August 2018
    • Works with KCL 2.0 and AWS Lambda (Nov 2018)
    • Each Consumer get 2 MB/s of provisioned throughput per shard
    • That means 20 consumers will get 40 MB/s per shard aggregated
    • No more 2 MB/s limit
    • Enhanced Fan Out: Kinesis pushes data to consumers over HTTP
    • Reduce latency (~70 ms)

    Comparison Enhanced Fan-Out and Standard Consumers

    Standard consumers

    • Low number of consuming applications (1, 2, 3, …)
    • Can tolerate ~200 ms latency
    • Minimize cost

    Enhanced Fan-Out Consumers

    • Multiple Consumer applications for the same Stream
    • Low Latency requirement ~ 70 ms
    • Higher costs
    • Default limit of 5 consumers using enhanced fan-out per data stream

    Kinesis Scaling

    Kinesis Operations - Adding Shards

    • Also called “Shard Splitting”
    • Can be used to increase the Stream capacity (1 MB/s data in per shard)
    • Can be used to divide a “hot shard”
    • The old shard is closed and will be deleted once the data is expired

    Kinesis Operations - Merging Shards

    • Decrease the Stream capacity and save costs
    • Can be used to group two shards with low traffic
    • Old shards are closed and deleted based on data expiration

    Out-of-order records after resharding

    • After a reshard, you can read from child shards
    • However, data you haven’t read yet could still be in the parent
    • If you start reading the child before completing reading the parent, you could read data for a particular hash key out of order
    • After a reshard, read entirely from the parent until you don’t have new records
    • Note: The Kinesis Client Library (KCL) has this logic already built-in, even after resharding operations

    Kinesis Operations - Auto Scaling

    Kinesis Scaling Limitations

    • Resharding cannot be done in parrallel. Plan capacity in advance
    • You can only perform one resharding operation at a time and it takes a few seconds
    • For 1000 shards, it takes 30K seconds (8.3 hours) to double the shards to 2000

    You can’t do the following:

    • Scale more than 10x for each rolling 24-hour period for each stream
    • Scale up to more than double your current shard count for a stream
    • Scale down below half your current shard count for a stream
    • Scale up to more than 500 shards in a stream
    • Scale a stream with more than 500 shards down unless the result is fewer than 500 shards
    • Scale up to more than the shard limit for your account

    Handling Duplicates

    Producer-side

    • Producer retries can create duplicates due to network timeouts
    • Although the two records have identical data, they also have unique sequence numbers
    • Fix: embed unique record ID in the data to de-duplicate on the consumer side

    Consumer-side

    • Consumer retries can make your application read the same data twice
    • Consumer retries happen when record processors restart:
      • A worker terminates unexpectedly
      • Worker instances are added or removed
      • Shards are merged or split
      • The application is deployed
    • Fixes:

    Kinesis Security

    • Control access / authorization using IAM policies
    • Encryption in flight using HTTPS endpoints
    • Encryption at rest using KMS
    • Client side encryption must be manually implemented (harder)
    • VPC Endpoints available for Kinesis to access with VPC

    Kinesis Data Firehose

    • Fully Managed Service, no administration
    • Near Real Time (60 seconds latency minimum for non full batches)
    • Load data into RedShift / S3 / ElasticSearch / Splunk
    • Automatic scaling
    • Supports many data formats
    • Data Conversions from JSON to Parquet / ORC (only for S3)
    • Data Transformation through AWS Lambda (ex: CSV → JSON)
    • Supports compression when target is Amazon S3 (GZIP, ZIP, and SNAPPY)
    • Only GZIP is the data is further loaded into Redshift
    • Pay for the amount of data going through Firehose
    • Spark / KCL do not read from KDF

    Kinesis Data Firehose Delivery Diagram

    • There’s a bunch of blueprints available for AWS Lambda to try to help you transform the data in Firehose in the format you want
    • If Redshift is your destination, actually it goes through S3, and then there will be a COPY command issued to put that data into Redshift
    • You don’t lose data with Kinesis Data Firehose
      • either it ends in your targets
      • you will have a recollection of all the transformation failure, the delivery failure, and even the source records if you want it to into another S3 bucket

    Firehose Buffer Sizing

    • Firehose accumulates records in a buffer
    • The buffer is flushed based on time and size rules
    • Buffer Size (ex:32 MB): if that buffer size is reached, it’s flushed
    • Buffer Time (ex: 2 minutes): if that time is reached, it’s flushed
    • Firehose can automatically increase the buffer size to increase throughput
    • Hight throughput → Buffer Size will be hit
    • Low throughput → Buffer Time will be hit

    Kinesis Data Stream vs Firehose

    Streams

    • Going to write custom code (producer/consumer)
    • Real time (~200 ms latency for classic, ~70 ms latency for enhanced fan-out)
    • Must manage scaling (shard splitting/merging)
    • Data Storage for 1 to 365 days, replay capability, multi consumers
    • Use with Lambda to insert data in real-time to ElasticSearch (for example)

    Firehose

    • Fully managed, send to S3, Splunk, Redshift, ElasticSearch
    • Serverless data transformations with Lambda
    • Near real time (lowest buffer time is 1 minute)
    • Automated Scaling
    • No data Storage

    CloudWatch Logs Subscriptions Filters

    CloudWatch Logs Subscription Filter Patterns Near Real-Time into Amazon ES

    CloudWatch Logs Subscription Filter Patterns Real Time Load into Amazon ES

    CloudWatch Logs Subscription Filter Patterns Real Time Analytics into Amazon ES

    'Data Engineering' 카테고리의 다른 글

    AWS MSK  (0) 2022.07.01
    Collection Introduction  (0) 2022.06.29
    Data Format  (0) 2022.06.17
    Apache Airflow  (0) 2022.06.06
    Apache Kafka  (0) 2019.09.05

    댓글

Designed by Tistory.