Skip to content

The Producer/Consumer APIs

In Getting Started we encountered basic usage of the HollowProducer and HollowConsumer APIs.
This basic usage implies some default behavior which, if desired, may be customized to better suit your purposes. A more in-depth exploration of the available customizable features of these APIs follows.

The HollowProducer

Generally, a producer runs a repeating cycle. At the end of each cycle, the producer has created a data state, published the artifacts necessary for consumers to bring their in-memory data stores to that data state, and announced the availability of the state.

The HollowProducer encapsulates the details of publishing, announcing, validating, and (if necessary) rollback of data states. In order to accomplish this, a few infrastructure hooks should be injected:

HollowProducer
   .withPublisher(publisher)         /// required: a BlobPublisher
   .withAnnouncer(announcer)         /// optional: an Announcer
   .withValidators(validators)       /// optional: one or more Validator
   .withListeners(listeners)         /// optional: one or more HollowProducerListeners
   .withBlobStagingDir(dir)          /// optional: a java.io.File
   .withBlobCompressor(compressor)   /// optional: a BlobCompressor
   .withBlobStager(stager)           /// optional: a BlobStager
   .withSnapshotPublishExecutor(e)   /// optional: a java.util.concurrent.Executor
   .withNumStatesBetweenSnapshots(n) /// optional: an int
   .withTargetMaxTypeShardSize(size) /// optional: a long
   .withBlobStorageCleaner(blobStorageCleaner) //optional: a BlobStorageCleaner
   .withMetricsCollector(hollowMetricsCollector) //optional: a HollowMetricsCollector<HollowProducerMetrics>

Let's examine each of the injected configurations into the HollowProducer:

  • BlobPublisher: Implementations of this class define how to publish blob data to the blob store.
  • Announcer: Implementations of this class define the announcement mechanism, which is used to track the version of the currently announced state.
  • Validator: Implementations of this class allow for semantic validation of the data contained in a state prior to announcement. If an Exception is thrown during validation, the state will not be announced, and the producer will be automatically rolled back to the prior state.
  • HollowProducerListener: Listeners are notified about the progress and status of producer cycles throughout the various cycle stages.
  • Blob staging directory: Before blobs are published, they must be written and inspected/validated. A directory may be specified as a File to which these "staged" blobs will be written prior to publish. Staged blobs will be cleaned up automatically after publish.
  • BlobCompressor: Implementations of this class intercept blob input/output streams to allow for compression in the blob store.
  • BlobStager: Implementations will define how to stage blobs, if the default behavior of staging blobs on local disk is not desirable. If a custom BlobStager is provided, then neither a blob staging directory or BlobCompressor should be provided.
  • Snapshot publish Executor: When consumers start up, if the latest announced version does not have a snapshot, they can load an earlier snapshot and follow deltas to get up-to-date. A state can therefore be available and announced prior to the availability of the snapshot. If an Executor is supplied here, then it will be used to publish snapshots.
    This can be useful if snapshot publishing takes a long time -- subsequent cycles may proceed while snapshot uploads are still in progress.
  • Number of cycles between snapshots: Because snapshots are not necessary for a data state to be available and announced, they need not be published every cycle. If this parameter is specified, then a snapshot will be produced only every (n+1)th cycle.
  • VersionMinter: Allows for a custom version identifier minting strategy.
  • Target max type shard size: Specify a target max type shard size. Defaults to 16MB.
  • BlobStorageCleaner: Using the blob storage cleaning capability, it's possible to free up the blob storage and prevent running out of space because of old snapshots/deltas.
  • HollowMetricsCollector: Implementing a HollowMetricsCollector allows to either store or publish those metrics to your preferred provider, such as Prometheus.

Each time a new data state should be produced, users should call .runCycle(Populator). See Getting Started for more basic usage details.

Restoring At Startup

Ideally the same HollowProducer would be held in memory forever, and runCycle() would be called every so often to produce a never-ending intact delta chain. However, this isn’t always possible; the producer will need to be restarted from time to time due to deployment or other operational circumstances.

In order to produce a delta between states produced by one HollowProducer and another, the producer can restore the prior state upon restart, which will allow a delta and reverse delta to be produced. See Restoring at Startup for usage.

Once we have restored the prior state, we can produce a delta from our producer's first cycle. The delta will be applicable to any consumers which are on the state from which we restored.

Initializing Before Restore

Before restoring, we must always initialize our data model. A HollowProducer's data model may be initialized:

  • via the HollowObjectMapper by calling initTypeState() with all top-level classes
  • via a set of schemas loaded from a text file using the HollowSchemaParser and HollowWriteStateCreator

Truncating a Delta Chain

If a problem occurs and you need to pin back consumers, you may want to restart your producer and explicitly restore from the pinned state. Once the producer's first cycle completes, it will publish a delta from the pinned state to the newly produced state, overwriting the previous delta from the pinned state. In this way, when you unpin, consumers will automatically follow the new delta, and the old forward-path from the pinned state will be truncated.

If any consumers somehow did happen to remain on a truncated state, the reverse delta out of the truncated chain is still intact -- they could be manually pinned back to the restored state, then unpinned to get back up-to-date.

Rolling Back

While producing a new state, if the HollowProducer encounters an error during data state population or validation fails, the current data state will be aborted and the underlying state engine will be rolled back to the previous data state. Any delta produced on the next cycle will be from the last successful data state.

Validating Data

It likely makes sense to perform some basic validation on your produced data states before announcing them to clients.
If you provide one or more Validators to the HollowProducer, these will be automatically executed prior to announcement. Validation rules will be specific to the semantics of the dataset, and may include some heuristics-based metrics based on expectations about the dataset. If your Validator throws an Exception, the HollowProducer will automatically roll back the state engine and the next successful cycle will produce a delta from the prior successful state.

Compacting Data

It is possible to produce delta chains which extend over many thousands of states. If during this delta chain an especially large delta happens for a specific type, it’s possible that many ordinal holes will be present in that type.
If over time multiple types go through especially large deltas, this can have an impact on a dataset’s heap footprint.

To reclaim heap space occupied by ordinal holes, a special compaction cycle can be run on the HollowProducer.
During compaction, no record data will change, but identical records will be relocated off of the high end of the ordinal space into the ordinal holes. This is accomplished by producing a new data state with no changes except for the more optimal ordinal assignments.

To run a compaction cycle, call runCompactionCycle(config) on the HollowProducer. If this method returns a valid version identifier, then a compaction cycle occurred and produced a new data state. If it returns Long.MIN_VALUE, then the compaction criteria specified in the CompactionConfig was not met and no action was taken. See the HollowCompactor javadoc for more details.

The Incremental HollowProducer

If it is known what changes are to be applied to a data state then an incremental producer can utilized. Instead of providing the whole data state (except initially on the first production of the datastate), all records, on each cycle the set of records that have changed may be provided (those records added, modified, or removed). Each record must be associated with a primary type (the record's Java type must be annotated with @HollowPrimaryKey) to ensure the record's identity can be determined and therefore calculate the change.

In all other respects an incremental HollowProducer behaves the same, encapsulating the details of publishing, announcing, validating, and (if necessary) rollback of data states, with the same injection of infrastructure hooks when building. Consumers are non-the-wiser as to full or incremental production.

An incremental HollowProducer may be built in the same manner as a HollowProducer as follows:

HollowProducer.Incremental ip = HollowProducer
   .withPublisher(publisher)         
   ... // further infrastructure hooks
   buildIncremental();

Primary key records

It is recommend that any top-level type be annotated with @HollowPrimaryKey regardless of full or incremental production since this makes it easy for tooling to track changes.

The HollowConsumer

Data consumers keep their local copy of a dataset current by ensuring that their state engine is always at the latest announced data state. Consumers can arrive at a particular data state in a couple of different ways:

  • At initialization time, they will load a snapshot, which is an entire copy of the dataset to be forklifted into memory.
  • After initialization time, they will keep their local copy of the dataset current by applying delta transitions, which are the differences between adjacent data states.

The HollowConsumer encapsulates the details of initializing and keeping a dataset up to date. In order to accomplish this task, a few infrastructure hooks should be injected:

HollowConsumer
   .withBlobRetriever(blobRetriever)              /// required: a BlobRetriever
   .withLocalBlobStore(localDiskDir)              /// optional: a local disk location
   .withAnnouncementWatcher(announcementWatcher)  /// optional: a AnnouncementWatcher
   .withRefreshListener(refreshListener)          /// optional: a RefreshListener
   .withGeneratedAPIClass(MyGeneratedAPI.class)   /// optional: a generated client API class
   .withFilterConfig(filterConfig)                /// optional: a HollowFilterConfig
   .withDoubleSnapshotConfig(doubleSnapshotCfg)   /// optional: a DoubleSnapshotConfig
   .withObjectLongevityConfig(objectLongevityCfg) /// optional: an ObjectLongevityConfig
   .withObjectLongevityDetector(detector)         /// optional: an ObjectLongevityDetector
   .withRefreshExecutor(refreshExecutor)          /// optional: an Executor
   .withMetricsCollector(hollowMetricsCollector) //optional: a HollowMetricsCollector<HollowConsumerMetrics>
   .build();

Let's examine each the injected hooks to the HollowConsumer:

  • BlobRetriever: The interface to the blob store. This is the only hook for which a custom implementation is required. Each of the other hooks have default implementations which may be used. The BlobRetriever may be omitted only if a previously-populated local blob store is specified.
  • Local blob store: A File which indicates where to record downloaded blobs and find previously downloaded blobs.
    If specified along with a BlobRetriever, the HollowConsumer will prefer to use previously downloaded blobs where applicable, and otherwise write newly downloaded blobs to the specified directory. If specified without a BlobRetriever, only previously downloaded blobs will be available.
  • AnnouncementWatcher: Provides an interface to the state announcement mechanism. Often, announcement polling logic is encapsulated inside implementations.
  • RefreshListener: Provides hooks so that actions may be taken during and after updates (e.g. indexing).
  • Generated API Class: Specifies a custom-generated Hollow API to use.
  • HollowFilterConfig:
  • DoubleSnapshotConfig: Defines advanced settings related to double snapshots.
  • ObjectLongevityConfig: Defines advanced settings related to object longevity.
  • ObjectLongevityDetector: Implementations are notified when stale hollow object existence and usage is detected.
  • RefreshExecutor: An Executor to use when asynchronous updates are called via triggerAsyncRefresh().
  • HollowMetricsCollector: Implementing a HollowMetricsCollector allows to either store or publish those metrics to your preferred provider, such as Prometheus.

Each time the identifier of the currently announced state changes, triggerRefresh() should be called on the HollowConsumer. This will bring the data up to date.

In general, the only requirement for getting Hollow consumers to work with your specific infrastructure is to implement a BlobRetriever and AnnouncementWatcher, and use them with a HollowConsumer.

Triggering Refresh

When implementing a AnnouncementWatcher, you will need to implement the method subscribeToUpdates(HollowConsumer consumer). When you create a HollowConsumer with an AnnouncementWatcher, it will automatically call back to this method with itself as the argument.

You should track all HollowConsumers received by calls to this method. When your announcement mechanism provides an updated value, you should notify each HollowConsumer via the triggerAsyncRefresh() method.

In this way, your HollowConsumer injected with this HollowAnnouncementWatcher implementation will be automatically kept up-to-date.

Dataset Consistency

If you have a long-running process which requires a consistent view of the dataset in a single state, you can prevent the HollowConsumer from updating while your process runs:

HollowConsumer consumer = ...

consumer.getRefreshLock().lock();
try {
    /// run your process
} finally {
    consumer.getRefreshLock().unlock();
}

The getRefreshLock() call returns the read lock in a ReadWriteLock. Refreshes use the write lock.