데먕 2019. 9. 20. 00:42

Kinesis Data Stream

  • Real-time Data Stream
  • Retention between 1 day to 365 days
  • Ability to reprocess (replay) data
  • Once data is inserted in Kinesis, it can’t be deleted (immutability)
  • Data that share the same partition goes to the same shard (ordering)
  • Producers: AWS SDK, Kinesis Producer Library (KPL), Kinesis Agent
  • Consumers
    • Write your own: Kinesis Client Library (KCL), AWS SDK
    • Managed: AWS Lambda, Kinesis Data Firehose, Kinesis Data Analytics

Capacity Modes

Provisioned mode

  • You choose the number of shards provisioned, scale manually or use API
  • Each shard gets 1 MB/s in (or 1000 records per second)
  • Each shard gets 2 MB/s out (classic or enhanced fan-out consumer)
  • You pay per shard provisioned per hour

On-demand mode

  • No need to provision or manage the capacity
  • Default capacity provisioned (4 MB/s in or 4000 records per second)
  • Scales are automatically based on observed throughput peaks during the last 30 days
  • Pay per stream per hour & data in/out per GB

Security

  • Control access/authorization using IAM policies
  • Encryption in flight using HTTPS endpoints
  • Encryption at rest using KMS
  • You can implement encryption/decryption of data on the client-side (harder)
  • VPC Endpoints are available for Kinesis to access within VPC
  • Monitor API calls using CloudTrail

Architecture

Producer

Producers send data into Kinesis Data Stream

Can be in many forms: Applications, multi-platform clients, AWS SDK, Kinesis Producer Library (KPL), Kinesis Agent, Apache Spark, Log4j Appenders, Flume, Kafka Connect, NiFi, and so on.

Record

Partition key: Define and help determine in which shard will the record go to

Data Blob: The value itself

Fan-out

1 MB/sec or 1000 msg/sec per shard. If you have 6 shards, you get 6 MB/sec or 6000 msg/sec

Kinesis Data Streams

Kinesis Data Stream is made of multiple shards

Shards

  • Shards are numbered
  • Define data stream capacity in terms of ingestion and consumption rates

Consumer

Many forms: Applications, SDK, Kinesis Client Libraries, Lambda, Kinesis Data Firehose, Kinesis Data Analysis

Record

Partition key: Define and help determine in which shard will the record go to

Sequence number: Represent where a record was in the shard

Data Blob: The value itself

Fan-out

  1. 2 MB/sec (shared) per shard for all consumers
  2. 2 MB/sec (enhanced) Per shard per consumer

Kinesis Producer SDK - PutRecord(s)

  • APIs that are used PutRecord (one) and PutRecords (many records)
  • PutRecords uses batching and increases throughput → less HTTP requests
  • ProvisionedThroughputExceeded if we go over the limits
    • AWS Mobile SDK: Android, IOS, and etc
  • Use case: low throughput, higher latency, simple API, AWS Lambda
  • Managed AWS sources for Kinesis Data Streams:
    • CloudWatch Logs
    • AWS IoT
    • Kinesis Data Analytics

AWS Kinesis API - Exceptions

ProvisionedThroughputExceeded Exceptions

  • Happens when sending more data (exceeding MB/s or TPS for any shard)
  • Make sure you don’t have a hot shard (such as your partition key is bad and too many data goes to that partition)

Solution

  • Retries with backoff
  • Increase shards (scaling)
  • Ensure your partition key is a good one

Kinesis Producer Library (KPL)

  • Easy to use and highly configurable C++/Java library
  • Used for building high-performance, long-running producers
  • Automated and configurable retry mechanism
  • Synchronous and Asynchronous API (better performance for async)
  • Submits metrics to CloudWatch for monitoring
  • Batching (both turned on by default) - increase throughput, decrease cost:
    • Collect: Records and Write to multiple shards in the same PutRecords API call
    • Aggregate: increased latency
      • Capability to store multiple records in one record (go over 1000 records per second limit)
      • Increase payload size and improve throughput (maximize 1 MB/s limit)
  • Compression must be implemented by the user
  • KPL Records must be de-coded with KCL or a special helper library

Batching

  • A record is not going to be sent right away, but aggregate several records into one record
  • Several aggregated records also are collected and sent by one PutRecords
  • We can influence the batching efficiency by introducing some delay with RecordMaxBufferedTime (default 100ms)

When not to use

  • The KPL can incur an additional processing delay of up to RecordMaxBufferedTime within the library (user-configurable)
  • Larger values for RecordMaxBufferedTime results in higher packing efficiencies and better performance
  • Applications that cannot tolerate this additional delay may need to use the AWS SDK directly

Kinesis Agent

  • Monitor Log files and sends them to Kinesis Data Streams
  • Java-based agent, built on top of KPL
  • Install in Linux-based server environments

Features

  • Write from multiple directories and write to multiple streams
  • Routing feature based on directory / log file
  • Pre-process data before sending to streams (single line, csv to json, log to json, and so on)
  • The agent handles file rotation, checkpointing, retry upon failures
  • Emits metrics to CloudWatch for monitoring

Kinesis Consumer SDK - GetRecords

  • Classic Kinesis - Records are polled by consumers from a shard
  • Each shard has 2 MB total aggregate throughput
  • GetRecords returns up to 10 MB of data (then throttle for 5 seconds) or up to 10000 records
  • Maximum of 5 GetRecords API calls per shard per second = 200 ms latency
  • If 5 consumers application consume from the same shard, means every consumer can poll once a second and receive less than 400 KB/s

Kinesis Client Library (KCL)

  • Java-first library but exists for other languages too (Golang, Python, Ruby, Node, .Net, and so on)
  • Read records from Kinesis produced with the KPL (de-aggretation)
  • Share multiple shards with multiple consumers in one group, shard discovery
  • Checkpoint feature to resume progress
  • Leverage DynamoDB for coordination and checkpointing (one row per shard)
    • Make sure you provision enough Write Capacity Unit(WCU) / Read Capacity Unit (RCU)
    • Or use On-Demand for DynamoDB
    • Otherwise DynamoDB may slow down KCL
  • Record processors will process the data
  • ExpiredIteratorException → increase WCU

Kinesis Connector Library

  • Older Java library (2016(, leverage the KCL library
  • Write data to:
    • S3
    • DynamoDB
    • Redshift
    • ElasticSearch
  • Kinesis Firehose replaces the Connector Library for a few of these targets, Lambda for the others

Lambda sourcing from Kinesis

  • Lambda can source records from Kinesis Data Streams
  • Lambda consumer has a library to de-aggregate record from the KPL
  • Lambda can be used to run lightweight ETL to:
    • S3
    • DynamoDB
    • Redshift
    • ElasticSearch
    • Anywhere you want
  • Lambda can be used to trigger notifications / send emails in real time
  • Lambda has a configurable batch size (more in Lambda section)

Kinesis Enhanced Fan Out

  • New game-changing feature from August 2018
  • Works with KCL 2.0 and AWS Lambda (Nov 2018)
  • Each Consumer get 2 MB/s of provisioned throughput per shard
  • That means 20 consumers will get 40 MB/s per shard aggregated
  • No more 2 MB/s limit
  • Enhanced Fan Out: Kinesis pushes data to consumers over HTTP
  • Reduce latency (~70 ms)

Comparison Enhanced Fan-Out and Standard Consumers

Standard consumers

  • Low number of consuming applications (1, 2, 3, …)
  • Can tolerate ~200 ms latency
  • Minimize cost

Enhanced Fan-Out Consumers

  • Multiple Consumer applications for the same Stream
  • Low Latency requirement ~ 70 ms
  • Higher costs
  • Default limit of 5 consumers using enhanced fan-out per data stream

Kinesis Scaling

Kinesis Operations - Adding Shards

  • Also called “Shard Splitting”
  • Can be used to increase the Stream capacity (1 MB/s data in per shard)
  • Can be used to divide a “hot shard”
  • The old shard is closed and will be deleted once the data is expired

Kinesis Operations - Merging Shards

  • Decrease the Stream capacity and save costs
  • Can be used to group two shards with low traffic
  • Old shards are closed and deleted based on data expiration

Out-of-order records after resharding

  • After a reshard, you can read from child shards
  • However, data you haven’t read yet could still be in the parent
  • If you start reading the child before completing reading the parent, you could read data for a particular hash key out of order
  • After a reshard, read entirely from the parent until you don’t have new records
  • Note: The Kinesis Client Library (KCL) has this logic already built-in, even after resharding operations

Kinesis Operations - Auto Scaling

Kinesis Scaling Limitations

  • Resharding cannot be done in parrallel. Plan capacity in advance
  • You can only perform one resharding operation at a time and it takes a few seconds
  • For 1000 shards, it takes 30K seconds (8.3 hours) to double the shards to 2000

You can’t do the following:

  • Scale more than 10x for each rolling 24-hour period for each stream
  • Scale up to more than double your current shard count for a stream
  • Scale down below half your current shard count for a stream
  • Scale up to more than 500 shards in a stream
  • Scale a stream with more than 500 shards down unless the result is fewer than 500 shards
  • Scale up to more than the shard limit for your account

Handling Duplicates

Producer-side

  • Producer retries can create duplicates due to network timeouts
  • Although the two records have identical data, they also have unique sequence numbers
  • Fix: embed unique record ID in the data to de-duplicate on the consumer side

Consumer-side

  • Consumer retries can make your application read the same data twice
  • Consumer retries happen when record processors restart:
    • A worker terminates unexpectedly
    • Worker instances are added or removed
    • Shards are merged or split
    • The application is deployed
  • Fixes:

Kinesis Security

  • Control access / authorization using IAM policies
  • Encryption in flight using HTTPS endpoints
  • Encryption at rest using KMS
  • Client side encryption must be manually implemented (harder)
  • VPC Endpoints available for Kinesis to access with VPC

Kinesis Data Firehose

  • Fully Managed Service, no administration
  • Near Real Time (60 seconds latency minimum for non full batches)
  • Load data into RedShift / S3 / ElasticSearch / Splunk
  • Automatic scaling
  • Supports many data formats
  • Data Conversions from JSON to Parquet / ORC (only for S3)
  • Data Transformation through AWS Lambda (ex: CSV → JSON)
  • Supports compression when target is Amazon S3 (GZIP, ZIP, and SNAPPY)
  • Only GZIP is the data is further loaded into Redshift
  • Pay for the amount of data going through Firehose
  • Spark / KCL do not read from KDF

Kinesis Data Firehose Delivery Diagram

  • There’s a bunch of blueprints available for AWS Lambda to try to help you transform the data in Firehose in the format you want
  • If Redshift is your destination, actually it goes through S3, and then there will be a COPY command issued to put that data into Redshift
  • You don’t lose data with Kinesis Data Firehose
    • either it ends in your targets
    • you will have a recollection of all the transformation failure, the delivery failure, and even the source records if you want it to into another S3 bucket

Firehose Buffer Sizing

  • Firehose accumulates records in a buffer
  • The buffer is flushed based on time and size rules
  • Buffer Size (ex:32 MB): if that buffer size is reached, it’s flushed
  • Buffer Time (ex: 2 minutes): if that time is reached, it’s flushed
  • Firehose can automatically increase the buffer size to increase throughput
  • Hight throughput → Buffer Size will be hit
  • Low throughput → Buffer Time will be hit

Kinesis Data Stream vs Firehose

Streams

  • Going to write custom code (producer/consumer)
  • Real time (~200 ms latency for classic, ~70 ms latency for enhanced fan-out)
  • Must manage scaling (shard splitting/merging)
  • Data Storage for 1 to 365 days, replay capability, multi consumers
  • Use with Lambda to insert data in real-time to ElasticSearch (for example)

Firehose

  • Fully managed, send to S3, Splunk, Redshift, ElasticSearch
  • Serverless data transformations with Lambda
  • Near real time (lowest buffer time is 1 minute)
  • Automated Scaling
  • No data Storage

CloudWatch Logs Subscriptions Filters

CloudWatch Logs Subscription Filter Patterns Near Real-Time into Amazon ES

CloudWatch Logs Subscription Filter Patterns Real Time Load into Amazon ES

CloudWatch Logs Subscription Filter Patterns Real Time Analytics into Amazon ES