Class EntryProcessorExecutor<K,V>

java.lang.Object
com.loomcache.server.compute.EntryProcessorExecutor<K,V>
Type Parameters:
K - the type of the key
V - the type of the value

public class EntryProcessorExecutor<K,V> extends Object
Executor for processing map entries atomically using CacheEntryProcessor.

Provides three execution modes: - processEntry: Process a single entry (synchronously) - processEntries: Process multiple entries in parallel using StructuredTaskScope - executeOnAll: Process all entries in the map in parallel - aggregate: Scan entries with single-stage or two-stage map-reduce aggregation

Thread safety: Each entry processing is atomic relative to ordinary map mutations by coordinating through a map-owned exact-key gate and the map transaction lock.

  • Constructor Details

    • EntryProcessorExecutor

      public EntryProcessorExecutor(DistributedMap<K,V> map)
      Create a new executor for the given map with default timeout (30 seconds).
      Parameters:
      map - the map to process entries in (must not be null)
      Throws:
      NullPointerException - if map is null
    • EntryProcessorExecutor

      public EntryProcessorExecutor(DistributedMap<K,V> map, long timeoutMillis)
      Create a new executor for the given map with custom timeout.
      Parameters:
      map - the map to process entries in (must not be null)
      timeoutMillis - timeout for processor execution in milliseconds
      Throws:
      NullPointerException - if map is null
      IllegalArgumentException - if timeoutMillis is not positive
  • Method Details

    • processEntry

      public <R extends @Nullable Object> @Nullable R processEntry(K key, CacheEntryProcessor<K, V, @NonNull R> processor)
      Execute a processor on a single key atomically.

      For mutable processors, the processor has exclusive access to the entry relative to map mutations. For read-only processors (ReadOnlyCacheEntryProcessor), a read-only entry view is used while the map mutation lock is held to provide an atomic snapshot. The result is returned synchronously.

      Type Parameters:
      R - the result type
      Parameters:
      key - the key to process (must not be null)
      processor - the processor to apply (must not be null)
      Returns:
      the result of processing
      Throws:
      NullPointerException - if key or processor is null
    • processEntries

      public <R extends @Nullable Object> Map<K,R> processEntries(Set<K> keys, CacheEntryProcessor<K, V, @NonNull R> processor)
      Execute a processor on multiple keys in parallel using virtual threads.

      Each key is processed independently and concurrently. Results are collected in the order of the original key set. Execution times out after the configured timeout period.

      Type Parameters:
      R - the result type
      Parameters:
      keys - the keys to process (must not be null)
      processor - the processor to apply (must not be null)
      Returns:
      an unmodifiable map of keys to processor results
      Throws:
      NullPointerException - if keys or processor is null
      RuntimeException - if any processor execution fails or times out
    • executeOnAll

      public <R extends @Nullable Object> Map<K,R> executeOnAll(CacheEntryProcessor<K, V, @NonNull R> processor)
      Execute a processor on all entries in the map in parallel using virtual threads.

      All entries are processed independently and concurrently. The result map contains one entry per processed key.

      Type Parameters:
      R - the result type
      Parameters:
      processor - the processor to apply (must not be null)
      Returns:
      an unmodifiable map of all keys to processor results
      Throws:
      NullPointerException - if processor is null
      RuntimeException - if any processor execution fails
    • aggregate

      public <R> R aggregate(Aggregator<K,V,R> aggregator)
      Scan all entries in the map and aggregate results using the provided aggregator.

      The aggregator accumulates state for each entry, and partial results are combined when using parallel execution.

      Type Parameters:
      R - the type of the aggregated result
      Parameters:
      aggregator - the aggregator to use for combining results (must not be null)
      Returns:
      the aggregated result
      Throws:
      NullPointerException - if aggregator is null
    • aggregate

      public <R> R aggregate(Supplier<? extends Aggregator<K,V,R>> aggregatorFactory)
      Run a two-stage map-reduce aggregation over all visible entries.

      The factory creates one independent aggregator per map stage. Partial aggregators are reduced left-to-right via Aggregator.combine(Aggregator).

      Type Parameters:
      R - the type of the aggregated result
      Parameters:
      aggregatorFactory - creates fresh aggregators for each map stage
      Returns:
      the aggregated result after all partials are combined
      Throws:
      NullPointerException - if aggregatorFactory is null or returns null
    • aggregate

      public <R> R aggregate(Supplier<? extends Aggregator<K,V,R>> aggregatorFactory, MapPredicate<K,V> predicate)
      Run a two-stage map-reduce aggregation over entries matching predicate.
      Type Parameters:
      R - the type of the aggregated result
      Parameters:
      aggregatorFactory - creates fresh aggregators for each map stage
      predicate - filters entries before accumulation
      Returns:
      the aggregated result after all partials are combined
      Throws:
      NullPointerException - if aggregatorFactory, predicate, or a produced aggregator is null
    • aggregate

      public <R> R aggregate(Supplier<? extends Aggregator<K,V,R>> aggregatorFactory, MapPredicate<K,V> predicate, int partitionCount)
      Run a two-stage map-reduce aggregation with an explicit maximum map-stage count.
      Type Parameters:
      R - the type of the aggregated result
      Parameters:
      aggregatorFactory - creates fresh aggregators for each map stage
      predicate - filters entries before accumulation
      partitionCount - maximum number of map-stage partitions
      Returns:
      the aggregated result after all partials are combined
      Throws:
      NullPointerException - if aggregatorFactory, predicate, or a produced aggregator is null
      IllegalArgumentException - if partitionCount is not positive
    • project

      public <R> List<R> project(Projection<K,V,R> projection)
      Project all entries in the map using the provided projection function.

      Returns an unmodifiable list containing the projected result for each entry.

      Type Parameters:
      R - the type of the projected result
      Parameters:
      projection - the projection to apply to each entry (must not be null)
      Returns:
      an unmodifiable list of projected results (in no particular order)
      Throws:
      NullPointerException - if projection is null