Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects
In February 2019, Amazon Web Services (AWS) announced a new
feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3
Objects. It lets customers specify a custom expression for the Amazon S3 prefix
where data records are delivered. Previously, Kinesis Data Firehose allowed
only specifying a literal prefix. This prefix was then combined with a static
date-formatted prefix to create the output folder in a fixed format. Customers
asked for flexibility, so AWS listened and delivered.
Kinesis Data Firehose is most commonly used to consume event
data from streaming sources, such as applications or IoT devices. The data then is typically stored in a data
lake, so it can be processed and eventually queried. When storing data on Amazon S3, it is a best
practice to partition or group related data and store it together in the same
folder. This provides the ability to
filter the partitioned data and control the amount of data scanned by each
query, thus improving performance and reducing cost.
A common way to group data is by date. Kinesis Data Firehose automatically groups
data and stores it into the appropriate folders on Amazon S3 based on the
date. However, the naming of folders in
Amazon S3 is not compatible with Apache Hive naming conventions. This makes
data more difficult to catalog using AWS Glue crawlers and analyze using big
data tools.
This post discusses a new capability that lets us customize
how Kinesis Data Firehose names the output folders in Amazon S3. It covers how
custom prefixes work, the intended use cases, and includes step-by-step
instructions to try the feature in your own account.
The need for custom prefixes for Amazon S3 objects
Previously, Kinesis Data Firehose created a static Universal
Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It
then appended it to the provided prefix before writing objects to Amazon S3.
For example, if you provided a prefix “mydatalake/”, the generated folder
hierarchy would be “mydatalake/2019/02/09/13”.
However, to be compatible with Hive naming conventions, the folder
structure is expected to follow the format “/partitionkey=partitionvalue”. Using this naming convention, data can be
easily cataloged with AWS Glue crawlers, resulting in proper partition names.
Other methods for managing partitions also become possible
such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon
EMR, which can add all partitions through a single statement. Furthermore, you
can use other date-based partitioning patterns like “/dt=2019-02-09-13/”
instead of expanding the date out into folders.
This is helpful in reducing the total number of partitions that need to
be maintained as the table grows over time. It also simplifies range queries.
Providing the ability to specify custom prefixes obviates the need for an
additional ETL step to put the data in the right folder structure improving the
time to insight.
How custom prefixes for Amazon S3 objects works
This new capability does not let you use any date or timestamp
value from your event data, nor can you use any other arbitrary value in the
event. Kinesis Data Firehose uses an internal timestamp field called
ApproximateArrivalTimestamp. Each data record includes an
ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully
receives and stores the record. This is commonly referred to as a server-side
timestamp. Kinesis Data Firehose buffers incoming records according to the
configured buffering hints and delivers them into Amazon S3 objects for the
Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple
records, each with a different ApproximateArrivalTimestamp. When evaluating
timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the
oldest record that’s contained in the Amazon S3 object being written.
Kinesis Data Firehose also provides the ability to deliver
records to a different error output location when there is a delivery, AWS
Lambda transformation or format conversion failure. Previously, the error
output location could not be configured and was determined by the type of
delivery failure. With this release, the error output location
(ErrorOutputPrefix) can also be configured. One benefit of this new capability
is that you can separate failed records into date partitioned folders for easy
reprocessing.
So how do you specify the custom Prefix and the
ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where
the namespace can be either firehose or timestamp. The value can be either
“random-string” or “error-output-type” for the firehose namespace or a date
pattern for the timestamp namespace in the Java DateTimeFormatter format. In a
single expression, you can use a combination of the two namespaces although the
!{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For
more information and examples, see Custom Prefixes for Amazon S3 Objects.
Writing streaming data into Amazon S3 with Kinesis Data
Firehose
This walkthrough describes how streaming data can be written
into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder
structure. It then shows how AWS Glue
crawlers can infer the schema and extract the proper partition names that we
designated in Kinesis Data Firehose, and catalog them in AWS Glue Data
Catalog. Finally, we run sample queries
to show that partitions are indeed being recognized.
To demonstrate this, we use python code to generate sample
data. We also use a Lambda transform on
Kinesis Data Firehose to forcibly create failures. This demonstrates how data
can be saved to the error output location. The code that you need for this
walkthrough is included here in GitHub.
For this walkthrough, this is the architecture that we are
building:
Step 1: Create an Amazon S3 bucket
Create an S3 bucket to be used by Kinesis Data Firehose to
deliver event records. We use the AWS Command Line Interface (AWS CLI) to
create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to
substitute the bucket name in the example for your own.
aws s3 mb s3://kdfs3customprefixesexample --region us-east-1
Step 2: Lambda Transform (optional)
The incoming events have an ApproximateArrivalTimestamp
field in the event payload. This is
sufficient to create a proper folder structure on Amazon S3. However, when querying the data it may be
beneficial to expose this timestamp value as a top level column for easy
filtering and validation. To accomplish
this, we create a Lambda function that adds the ApproximateArrivalTimestamp as
a top level field in the data payload. The data payload is what Kinesis Data
Firehose writes as an object in Amazon S3. Additionally, the Lambda code also
artificially generates some processing errors that are delivered to the
“ErrorOutputPrefix” location specified for the delivery destination to
illustrate the use of expressions in the
“ErrorOutputPrefix.”[Source]-https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/
big data course at Asterix Solution is designed to scale up from single servers
to thousands of machines, each offering local computation and storage. With the
rate at which memory cost decreased the processing speed of data never
increased and hence loading the large set of data is still a big headache and
here comes Hadoop as the solution for it.
Comments
Post a Comment