October 15, 2024
Pinterest Engineering
Pinterest Engineering Blog

20 min learn

Sep 17, 2024

Jeff Xiang | Senior Software program Engineer, Logging Platform; Vahid Hashemian | Workers Software program Engineer, Logging Platform

In relation to PubSub options, few have achieved increased levels of ubiquity, neighborhood assist, and adoption than Apache Kafka®️, which has turn into the business commonplace for information transportation at giant scale. At Pinterest, petabytes of information are transported via PubSub pipelines day by day, powering foundational methods reminiscent of AI coaching, content material security and relevance, and real-time advert bidding, bringing inspiration to lots of of hundreds of thousands of Pinners worldwide. Given the continual development in PubSub-dependent use circumstances and natural information quantity, it turned paramount that PubSub storage have to be scaled to satisfy rising storage calls for whereas reducing the per-unit price of storage.

Tiered Storage is a design sample that addresses this drawback by offloading information sometimes saved on dealer disk to a less expensive distant storage, reminiscent of Amazon S3®️. This enables the brokers themselves to maintain much less information on costly native disks, decreasing the general storage footprint and price of PubSub clusters. MemQ is a PubSub answer that maximally employs this design sample by retaining all information in object storage, eliminating the necessity for native disk storage to decouple storage from compute.

KIP-405 adopts the Tiered Storage design sample for open-source Kafka (out there in Kafka 3.6.0+). It particulars a broker-coupled implementation, which natively integrates Tiered Storage performance into the dealer course of itself.

At Pinterest, we productionalized and at the moment are open-sourcing our implementation of Tiered Storage for Apache Kafka®️, which is decoupled from the dealer. This brings the advantages of storage-compute decoupling from MemQ to Kafka and unlocks key benefits in flexibility, ease of adoption, price discount, and useful resource utilization when in comparison with the native implementation in KIP-405.

With 20+ manufacturing matters onboarded since Could 2024, our broker-decoupled Tiered Storage implementation at the moment offloads ~200 TB of information day by day from dealer disk to a less expensive object storage. On this weblog, we share the strategy we took and the learnings we gained.

Information despatched via Kafka is quickly saved on the dealer’s disk, replicated throughout followers for every partition, and eliminated as soon as the info exceeds the configured retention time or measurement threshold. Because of this the price of Kafka storage is a perform of information quantity, retention, and replication issue. Information quantity development is usually natural in nature, whereas retention and replication issue are sometimes inflexible and obligatory for non-negotiables reminiscent of fault tolerance and restoration. When confronted with development in any of these variables, horizontal or vertical scaling of Kafka clusters is required to assist increased storage calls for.

Historically, horizontal scaling of Kafka clusters concerned including extra brokers to the cluster as a way to enhance the full storage capability, whereas vertical scaling concerned changing present brokers with new ones which have increased storage capability. This meant that the full storage price of scaling up Kafka clusters grew as an element of the per-unit storage price to retailer information on dealer disk. This reality is clear via the next equations:

totalCost = costPerGB * totalGB * replicationFactor

totalGB = GBperSecond * retentionSeconds

Substituting the second equation into the primary leads to:

totalCost = costPerGB * GBperSecond * retentionSeconds * replicationFactor

As talked about beforehand, we often do not need management over throughput (GBperSecond), retention (retentionSeconds), or replication issue. Due to this fact, reducing the full price of storage is most successfully achieved by lowering the per-unit storage price (costPerGB). Tiered Storage achieves this by offloading information from costly dealer disks to a less expensive distant storage.

The native Tiered Storage providing in Apache Kafka®️ 3.6.0+ incorporates its options into the dealer course of itself, leading to an inseparable coupling between Tiered Storage and the dealer. Whereas the tight coupling strategy permits the native implementation of Tiered Storage to entry Kafka inner protocols and metadata for a extremely coordinated design, it additionally comes with limitations in realizing the total potential of Tiered Storage. Most notably, integrating Tiered Storage into the dealer course of implies that the dealer is at all times within the lively serving path throughout consumption. This leaves behind the chance to leverage the distant storage system as a secondary serving path.

To handle this, we utilized the MemQ design sample to Tiered Storage for Apache Kafka®️ by decoupling Tiered Storage from the dealer, permitting for direct consumption from the distant storage. This delegates the lively serving path to the distant storage, liberating up assets on the Kafka cluster past simply storage and dramatically decreasing the price of serving. It additionally offers the next diploma of flexibility in adopting Tiered Storage and making use of function updates. The desk beneath illustrates a number of key benefits of a broker-decoupled strategy in contrast in opposition to the native implementation.

Determine 1: Structure Overview

The implementation of broker-decoupled Tiered Storage consists of three predominant parts:

  1. Phase Uploader: A steady course of that runs on every dealer and uploads finalized log segments to a distant storage system
  2. Tiered Storage Client: A client shopper able to studying information from each distant storage and native dealer disk
  3. Distant Storage: A storage system that ought to have a per-unit storage price that’s decrease than that of dealer disk and helps operations essential to the Phase Uploader and Tiered Storage Client

The diagram proven in Determine 1 depicts how the Phase Uploader and Tiered Storage Client work together with distant storage, in addition to different related methods and processes. Let’s dive deeper into every of those parts.

Phase Uploader

Determine 2: Phase Uploader Structure

The Phase Uploader is an impartial course of that runs on each dealer as a sidecar in a Kafka cluster that has enabled Tiered Storage. Its main accountability is to add finalized log segments for the partitions that the dealer leads. Attaining this goal whereas sustaining decoupling from the dealer course of requires designing these crucial mechanisms:

  • Log listing monitoring
  • Management change detection
  • Fault tolerance

Log Listing Monitoring

The Phase Uploader’s perform is to add information residing on its native dealer’s disk to a distant storage system. As a result of the truth that it runs independently of the dealer course of, the Phase Uploader displays the dealer file system for indications signaling that sure information recordsdata are prepared for add. Let’s dive into how this monitoring mechanism works.

The Kafka dealer course of writes information to an area listing specified by the dealer configuration log.dir. The listing construction and content material of log.dir is maintained by the dealer course of and sometimes appears like the next:

<log.dir>
| - - - topicA-0
| - - - 00100.index
| - - - 00100.log
| - - - 00100.timeindex
| - - - 00200.index
| - - - 00200.log
| - - - 00200.timeindex
| - - - 00300.index ← lively
| - - - 00300.log ← lively
| - - - 00300.timeindex ← lively
| - - - topicA-3
| - - - <ommitted>
| - - - topicB-1
| - - - <ommitted>
| - - - topicB-10
| - - - <ommitted>

This listing incorporates all of the topic-partitions that this dealer is a pacesetter or follower for. The dealer course of writes information it receives from producer functions into log section recordsdata (recordsdata ending in .log), in addition to corresponding indexing metadata into .index and .timeindex recordsdata that permit for environment friendly lookups. The recordsdata names correspond to the earliest offset contained within the section. Incoming information is written to the lively log section, which is the section with the most important offset worth.

The Phase Uploader makes use of a file system watcher on the topic-partition directories that this dealer is at the moment main to watch occasions on these directories. File system occasions point out to the Phase Uploader when a log section is finalized (i.e. it’s rotated and not receiving new information) and able to add. To do that, the Phase Uploader maintains a set of lively log segments for every topic-partition that it’s watching and uploads essentially the most lately rotated section upon detecting a rotation occasion.

Within the instance above, suppose that the dealer is a pacesetter for topicA-0 (topicA, partition 0), and 00300 is the lively section for topicA-0. Let’s additionally suppose that 00100 and 00200 are already uploaded to the distant storage, and every log section incorporates 100 offsets. The Kafka dealer course of will proceed to jot down incoming information into 00300 till it reaches the configured measurement or time threshold specified by dealer configurations. Most often, segments are rotated through the dimensions threshold, however time-based rotations can happen when the throughput is low sufficient the place the dimensions threshold can’t be reached earlier than the subject’s retention.ms configuration takes impact. Upon reaching both threshold, 00300 might be finalized and closed, whereas a brand new lively section (e.g. 00400) might be created. This triggers a file system occasion which notifies the Phase Uploader that 00300 is able to add. The Phase Uploader then enqueues all three recordsdata for 00300 for add to distant storage and units 00400 as the present lively section for topicA-0.

<log.dir>
| - - - topicA-0
| - - - 00100.index
| - - - 00100.log
| - - - 00100.timeindex
| - - - 00200.index
| - - - 00200.log
| - - - 00200.timeindex
| - - - 00300.index ← enqueue & add
| - - - 00300.log ← enqueue & add
| - - - 00300.timeindex ← enqueue & add
| - - - 00400.index ← lively
| - - - 00400.log ← lively
| - - - 00400.timeindex ← lively

Upon the profitable completion of the uploads, the Phase Uploader will add an offset.wm file whose content material is the final efficiently uploaded offset for that topic-partition. This mechanism commits the progress of the Phase Uploader and permits it to get well from restarts and interruptions by resuming uploads just for segments that got here after the final dedicated offset.

Management Change Detection

Decoupling the Phase Uploader from the dealer course of necessitates a dependable mechanism for locating topic-partition management and detecting modifications in actual time. The Phase Uploader achieves this by monitoring the ZooKeeper endpoints for the Kafka cluster, which is up to date and maintained by the Kafka cluster’s controller.

Upon startup, the Phase Uploader bootstraps the present state by studying the /brokers/matters/<matter>/partitions/<partition>/state path in ZooKeeper, which informs the Phase Uploader concerning the topic-partitions that the dealer is at the moment main. The Phase Uploader correspondingly locations a file system look ahead to these topic-partition subdirectories in log.dir.

Management modifications are mirrored in actual time beneath the identical ZooKeeper path. When a dealer turns into the chief of a topic-partition, it have to be already thought-about an in-sync reproduction (ISR), which implies that it’s absolutely caught up on replicating from the unique chief. This additionally implies that the Phase Uploader may be assured that when the Kafka cluster controller updates the management metadata in ZooKeeper, the Phase Uploader on the brand new chief can instantly place a watch on the topic-partition listing, whereas the Phase Uploader on the previous chief can take away the watch.

In Apache Kafka®️ 3.3+, ZooKeeper is changed by KRaft. As such, the Phase Uploader might want to monitor management through KRaft when deployed alongside the latest Kafka variations. Observe that KRaft assist within the Phase Uploader is at the moment beneath improvement.

Fault Tolerance

In Tiered Storage, missed uploads means information loss. As such, the principle goal of fault tolerance within the Phase Uploader is to ensure the continuity of information within the distant storage by guaranteeing that there aren’t any missed uploads. Because the Phase Uploader is decoupled from the dealer course of, designing it for fault tolerance requires particular consideration.

The commonest threat elements which could compromise information integrity of the uploaded log segments come primarily from the next areas:

  • Transient add failures
  • Dealer or Phase Uploader unavailability
  • Unclean chief election
  • Log section deletion attributable to retention

Transient add failures are often mitigated by the Phase Uploader’s retry mechanism. Dealer or Phase Uploader downtime may be recovered through the final dedicated offset specified by offset.wm as described within the earlier part. Unclean chief election leads to information loss on the dealer degree, so we settle for this as a built-in threat when unclean chief election is enabled.

Essentially the most fascinating drawback is the final one — how can the Phase Uploader forestall missed uploads when log section administration and deletions are carried out individually by the dealer course of?

The decoupling between Phase Uploader and the dealer course of implies that the Phase Uploader should add each rotated log section earlier than it’s cleaned up by Kafka retention insurance policies, and the mechanism to take action should depend on indicators exterior of the Kafka inner protocol. Doing so whereas sustaining decoupling from the dealer requires cautious consideration of how log segments are managed by the dealer.

Stopping Missed Uploads As a result of Log Phase Deletion

To higher perceive how the Phase Uploader prevents missed uploads attributable to log section deletion, let’s first discover how a log section is managed by the Kafka dealer’s LogManager. A given log section saved on native Kafka dealer disk undergoes 4 phases in its lifecycle: lively, rotated, staged for deletion, and deleted. The timing of when a log section transitions between these phases is decided by information throughput, in addition to a number of configuration values on the matter and dealer ranges. The diagram beneath explains the section transitions visually:

Determine 3: Log Phase Lifecycle & Timing

The dealer configuration log.section.bytes (default 1G) determines the dimensions threshold for every log section file. As soon as an lively section is crammed to this threshold, the section is rotated and a brand new lively section is created to simply accept subsequent writes. Thereafter, the rotated section stays on the native dealer’s disk till the topic-level retention thresholds of both retention.ms (time threshold) or retention.bytes (measurement threshold) is reached, whichever occurs first. Upon reaching the retention threshold, the Kafka dealer phases the section for deletion by appending a “.deleted” suffix to the section filename, and stays on this state for the length specified by the dealer configuration log.section.delete.delay.ms (default 60 seconds). Solely after this does the section get completely deleted.

A log section is simply eligible for add after it’s rotated and can not be modified. When a log section is deleted within the remaining section, it’s thought-about completely misplaced. Due to this fact, the Phase Uploader should add the log section whereas it’s within the rotated or staged for deletion phases. Beneath regular circumstances, the Phase Uploader will add essentially the most recently-rotated log section inside one to 2 minutes of the section’s rotation, as it’s instantly enqueued for add upon rotation. So long as the subject retention is giant sufficient for the Phase Uploader to have some buffer room for the add, this sometimes doesn’t current an issue. The diagram beneath compares the time sequences of the section lifecycle and the triggering of uploads.

Determine 4: Log section lifecycle vs. Phase Uploader uploads

This works effectively in apply supplied that log segments are rotated on a size-based threshold outlined by log.section.bytes. Nevertheless, in some circumstances, the log segments may be rotated on a time-based threshold. This situation happens when a topic-partition receives low sufficient site visitors the place it doesn’t have sufficient information to replenish log.section.bytes throughout the configured retention.ms. On this scenario, the dealer rotates the section and concurrently phases it for deletion when the section’s last-modified timestamp is past the subject’s retention.ms. That is illustrated within the following diagram:

Determine 5: Phase rotation attributable to time-based threshold

It’s crucial that the Phase Uploader is ready to add throughout the time that it’s within the staged for deletion section, the length of which is decided by the dealer configuration log.section.delete.delay.ms. Furthermore, the section filename upon rotation is totally different from the conventional situation because of the appended “.deleted” suffix, so trying to add the section with the common filename (with out the suffix) will end in a failed add. As such, the Phase Uploader retries the add with a “.deleted” suffix upon encountering a FileNotFoundException. Moreover, the dealer configuration of log.section.delete.delay.ms needs to be adjusted to a barely increased worth (e.g. 5 minutes) to supply extra buffer room for the Phase Uploader to finish the add.

It’s price mentioning that the above situation with low quantity matters is mostly not a priority as a result of the advantages of Tiered Storage are most successfully achieved when utilized to excessive quantity matters.

Tiered Storage Client

The Phase Uploader is simply a part of the story — information residing in distant storage is simply helpful if it may be learn by client functions. Many benefits of a broker-decoupled strategy (i.e. Pinterest Tiered Storage) are realized on the consumption facet, reminiscent of bypassing the dealer within the consumption path to save lots of on compute and cross-AZ community switch prices. Tiered Storage Client comes out-of-the-box with the potential of studying information from each distant storage and the dealer in a completely clear method to the person, bringing the theoretical advantages of broker-decoupled Tiered Storage from idea to actuality.

Client Structure

Determine 6: How Tiered Storage Client works

Tiered Storage Client is a shopper library that wraps the native KafkaConsumer shopper. It delegates operations to both the native KafkaConsumer or the RemoteConsumer relying on the specified serving path and the place the requested information is saved. Tiered Storage Client accepts native KafkaConsumer configurations, with some extra properties that customers can provide to specify the specified habits of Tiered Storage Client. Most notably, the person ought to specify the mode of consumption, which is likely one of the following:

  • Distant Solely: Bypass the dealer throughout consumption and skim straight and solely from distant storage.
  • Kafka Solely: Solely learn from Kafka brokers and by no means from distant storage (this is similar as an everyday KafkaConsumer).
  • Distant Most well-liked: If the requested offset exists in each distant and dealer, learn from distant.
  • Kafka Most well-liked: If the requested offset exists in each distant and dealer, learn from dealer.
Determine 7: Consumption modes and serving path for requested offsets

This part will give attention to the design and habits of Tiered Storage Client when studying from distant storage.

Studying From Distant Storage

Client Group Administration

Tiered Storage Client leverages the present functionalities of Kafka’s client group administration, reminiscent of partition task (through subscription) and offset commits, by delegating these operations to the KafkaConsumer no matter its consumption mode. That is the case even when consuming from distant storage. For instance, a Tiered Storage Client in Distant Solely mode can nonetheless take part in group administration and offset commit mechanisms the identical method {that a} common KafkaConsumer does, even when the offsets are solely out there within the distant storage system and cleaned up on the dealer.

Storage Endpoint Discovery

When studying from distant storage, Tiered Storage Client must know the place the info resides for the actual topic-partition that the patron is trying to learn. The storage endpoint for any specific topic-partition is supplied by the user-defined implementation of StorageServiceEndpointProvider, which needs to be shared between the Phase Uploader and the Tiered Storage Client. The StorageServiceEndpointProvider class identify is supplied within the Tiered Storage Client configurations, which deterministically constructs a distant storage endpoint for a given topic-partition. In apply, the identical implementation of StorageServiceEndpointProvider needs to be packaged into the classpaths of each the Phase Uploader and the patron utility to ensure that constructed endpoints are constant between the 2.

Determine 8: StorageServiceEndpointProvider utilization

With the distant endpoints constructed upon subscribe() or assign() calls, a Tiered Storage Client in its consumption loop will delegate learn tasks to both the KafkaConsumer or the RemoteConsumer, relying on the user-specified consumption mode and the place the requested offsets reside. Studying from distant storage is feasible in each consumption mode besides Kafka Solely.

Let’s stroll via the small print of Kafka Most well-liked and Distant Solely consumption modes. Particulars for the opposite two consumption modes are assumed to be self-explanatory after understanding these two.

Kafka Most well-liked Consumption

When in Kafka Most well-liked consumption mode, Tiered Storage Client delegates learn operations first to the native KafkaConsumer, then to the RemoteConsumer if the specified offsets don’t exist on the Kafka dealer. This mode permits for client functions to learn information in close to real-time beneath regular circumstances, and skim earlier information from distant storage if the patron is lagging past the earliest offsets on Kafka brokers.

Determine 9: Kafka Most well-liked consumption mode

When the requested offset just isn’t on the Kafka dealer, the KafkaConsumer’s ballot name throws a NoOffsetForPartitionException or OffsetOutOfRangeException. That is caught internally by the Tiered Storage Client in Kafka Most well-liked mode, which then delegates the RemoteConsumer to attempt to discover the info from distant storage. If the offset exists in distant storage, Tiered Storage Client returns these data to the applying layer after straight fetching these data from the distant storage system, skipping the dealer when accessing the precise information.

Distant Solely Consumption — Skip the Dealer

When in Distant Solely consumption mode, Tiered Storage Client solely delegates the learn operations to the RemoteConsumer. The RemoteConsumer will straight request information from the distant storage primarily based on the constructed endpoints for the assigned topic-partitions, permitting it to keep away from contacting the Kafka cluster straight throughout the consumption loop apart from offset commits and group rebalances / partition assignments, which nonetheless depend on Kafka’s inner client group and offset administration protocols.

Determine 10: Distant Solely consumption mode

When skipping the dealer in Distant Solely consumption mode, the total set of advantages in broker-decoupled Tiered Storage is realized. By rerouting the serving path to the distant storage system as an alternative of the dealer, client functions can carry out historic backfills and skim older information from distant storage whereas leveraging solely the compute assets of the distant storage system, liberating up these assets from the Kafka dealer. Concurrently, relying on the pricing mannequin of the distant storage system, cross-AZ community switch price may be averted (e.g. Amazon doesn’t cost for bandwidth between S3 and EC2 in the identical area). This pushes the advantages of adopting Tiered Storage past simply storage price financial savings.

Distant Storage

The selection of the distant storage system backing Tiered Storage is crucial to the success of its adoption. Because of this, particular consideration needs to be paid to the next areas when evaluating a distant storage system:

  • Interface compatibility and assist for operations essential to Tiered Storage
  • Pricing and mechanisms of information storage, switch, replication, and lifecycle administration
  • Scalability and partitioning

Interface Compatibility

At a minimal, the distant storage system ought to assist the next generic operations over the community:

void putObject(byte[] object, Path path);

Listing<String> listObjectNames(Path path);

byte[] getObject(Path path);

Some extra operations are usually wanted for improved efficiency and scalability. These embody, however will not be restricted to:

Future putObjectAsync(byte[] object, Path path, Callback callback);

InputStream getObjectInputStream(Path path);

Clearly, in-place updates and modifications to uploaded log segments are pointless. Because of this, object storage methods are often most popular for his or her scalability and price advantages. An eligible and appropriate distant storage system may be added to Pinterest Tiered Storage so long as they can implement the related interfaces within the Phase Uploader and Tiered Storage Client modules.

Information Storage, Switch, Replication, & Lifecycle Administration

The aim of adopting broker-decoupled Tiered Storage is to decrease the price and assets of storage and serving on the Kafka cluster. Due to this fact, you will need to perceive the technical mechanisms and pricing mannequin of the distant storage system in relation to operations offloaded from the Kafka cluster to the distant storage. These embody information storage, switch, replication, and lifecycle administration.

Many of the frequent distant storage methods with widespread business adoption have publicly out there documentation and pricing for every of these operations. It is very important be aware that the financial savings and advantages that may be achieved with Tiered Storage adoption is critically depending on holistic analysis of those elements of the distant storage system, in tandem with present case-specific elements reminiscent of information throughput, learn / write patterns, desired availability and information consistency, locality and placement of providers, and many others.

In contrast to the native implementation of Tiered Storage in KIP-405, the lifecycle administration of information uploaded to distant storage on this broker-decoupled implementation is delegated to native mechanisms on the distant storage system. Because of this, the retention of uploaded information on distant storage needs to be configured in response to the mechanisms out there to the distant storage system of selection.

Scalability & Partitioning

Writing information to a distant storage system and serving consumption utilizing its assets requires preparations for scale. The commonest bottleneck on the distant storage system comes from compute assets, which is usually enforced through request fee limits. For instance, Amazon S3®️ particulars its request rate limits for each reads and writes on a per partitioned prefix foundation. For Tiered Storage to function at giant scale, it’s crucial that the distant storage is pre-partitioned in a considerate method that evenly distributes request charges throughout the distant storage partitions as a way to keep away from hotspots and fee limiting errors.

Taking Amazon S3®️ for example, partitioning is achieved through frequent prefixes between object keys. A bucket storing Tiered Storage log segments ought to ideally be pre-partitioned in a method that evenly distributes request fee load throughout partitions. To take action, the item keyspace have to be designed in such a method that enables for prefix-based partitioning, the place every prefix-partition receives related request charges as each different prefix-partition.

In Tiered Storage, log segments uploaded to S3 usually adhere to a keyspace that appears like the next examples:

topicA-0
- - - -
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-0/00100.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-0/00200.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-0/00300.log

topicA-1
- - - -
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-1/00150.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-1/00300.log

topicB-0
- - - -
s3://my-bucket/custom-prefix/kafkaCluster1/topicB-0/01000.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicB-0/02000.log

The Kafka cluster identify, matter identify, and Kafka partition ID are all a part of the important thing. That is in order that the Tiered Storage Client can reconstruct the prefix solely primarily based on these items of knowledge when assigned to these topic-partitions. Nevertheless, what if topicA receives a lot increased learn and write site visitors than topicB? The above keyspace scheme doesn’t permit for S3 prefix partitioning in a method that evenly spreads out request fee load, and so the S3 prefix-partition custom-prefix/kafkaCluster1/topicA turns into a request fee hotspot.

The answer to this drawback is to introduce prefix entropy into the keyspace as a way to randomize the S3 prefix-partitions that host information throughout totally different matters and partitions. The idea of partitioning the distant storage through prefix entropy was launched in MemQ and has been battle-tested in manufacturing for a number of years.

To assist this in Tiered Storage, the Phase Uploader permits customers to configure a worth for ts.section.uploader.s3.prefix.entropy.bits which injects an N-digit MD5 binary hash to the item key. The hash is calculated from the cluster identify, matter identify, and Kafka partition ID mixture. Assuming that N=5, we get the next keys for a similar examples above:

topicA-0
- - - -
s3://my-bucket/custom-prefix/01011/kafkaCluster1/topicA-0/00100.log
s3://my-bucket/custom-prefix/01011/kafkaCluster1/topicA-0/00200.log
s3://my-bucket/custom-prefix/01011/kafkaCluster1/topicA-0/00300.log

topicA-1
- - - -
s3://my-bucket/custom-prefix/01010/kafkaCluster1/topicA-1/00150.log
s3://my-bucket/custom-prefix/01010/kafkaCluster1/topicA-1/00300.log

topicB-0
- - - -
s3://my-bucket/custom-prefix/11100/kafkaCluster1/topicB-0/01000.log
s3://my-bucket/custom-prefix/11100/kafkaCluster1/topicB-0/02000.log

With this new keyspace, if topicA receives a lot increased request charges than different matters, the request load is unfold evenly between totally different S3 prefix-partitions, assuming that every of its Kafka partitions receives comparatively even learn and write throughput. Making use of this idea throughout a lot of Kafka clusters, matters, and partitions will statistically result in a good distribution of request charges between S3 prefix-partitions.

When developing a Tiered Storage Client, the person should provide the identical N worth because the Phase Uploader in order that the patron is ready to reconstruct the proper key for every topic-partition it’s assigned to.

Prefix-partitioning my-bucket with N=5

Ensuing 32 prefix-partitions:
custom-prefix/00000
custom-prefix/00001
custom-prefix/00010
custom-prefix/00011

custom-prefix/11111

Decoupling from the dealer implies that Tiered Storage function additions may be rolled out and utilized with no need to improve dealer variations. Listed below are a few of the options which might be at the moment deliberate:

  • Integration with PubSub Client, a backend-agnostic shopper library
  • Integration with Apache Flink®️ (through PubSub Consumer integration)
  • Help for extra distant storage methods (e.g. HDFS)
  • Help for Parquet log section storage format to allow real-time analytics (depending on adoption of KIP-1008)

Pinterest Tiered Storage for Apache Kafka®️ is now open-sourced on GitHub. Test it out here! Suggestions and contributions are welcome and inspired.

The present state of Pinterest Tiered Storage for Apache Kafka®️ wouldn’t have been doable with out important contributions and assist supplied by Ambud Sharma, Shardul Jewalikar, and the Logging Platform crew. Particular because of Ang Zhang and Chunyan Wang for steady steerage, suggestions, and assist.

Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, and Flink®️ are logos of the Apache Software program Basis (https://www.apache.org/).
Amazon®️, AWS®️, S3®️, and EC2®️ are logos of Amazon.com, Inc. or its associates.

To study extra about engineering at Pinterest, take a look at the remainder of our Engineering Weblog and go to our Pinterest Labs web site. To discover and apply to open roles, go to our Careers web page.