Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects


In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.
Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.
A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.
This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.
The need for custom prefixes for Amazon S3 objects
Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.
Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.
How custom prefixes for Amazon S3 objects works
This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.
Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.
So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.
Writing streaming data into Amazon S3 with Kinesis Data Firehose
This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.
To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.
For this walkthrough, this is the architecture that we are building:
Step 1: Create an Amazon S3 bucket
Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.
aws s3 mb s3://kdfs3customprefixesexample --region us-east-1
Step 2: Lambda Transform (optional)
The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”[Source]-https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/
big data course at Asterix Solution is designed to scale up from single servers to thousands of machines, each offering local computation and storage. With the rate at which memory cost decreased the processing speed of data never increased and hence loading the large set of data is still a big headache and here comes Hadoop as the solution for it.


Comments

Popular posts from this blog

how to crack java interviews

interior designer in navi mumbai