Class DistributedExecutorService

java.lang.Object
com.loomcache.server.executor.DistributedExecutorService

public class DistributedExecutorService extends Object
Named distributed executor service for submitting serializable tasks to cluster nodes.

Each executor instance is identified by a unique name (like Hazelcast's IExecutorService). Tasks are submitted as Callable instances, serialized via Kryo, and executed on virtual threads for high-throughput concurrent execution.

Features

  • Virtual thread execution via Executors.newVirtualThreadPerTaskExecutor()
  • Per-task configurable timeout (default 30 seconds)
  • Full task lifecycle tracking: PENDING -> RUNNING -> COMPLETED/FAILED/CANCELLED/TIMED_OUT
  • Aggregate metrics: submitted, completed, failed, cancelled counts + avg execution time
  • Key-based routing and member-targeted submission

Thread Safety

All task tracking uses ConcurrentHashMap. Counters use AtomicLong. Shutdown is guarded by an AtomicBoolean flag.
Since:
2.0
  • Constructor Details

    • DistributedExecutorService

      public DistributedExecutorService(String name, int instanceNumber, KryoSerializer kryoSerializer)
      Create a new distributed executor service.
      Parameters:
      name - the executor name (must not be blank)
      instanceNumber - the node instance number for logging
      kryoSerializer - the Kryo serializer for task result serialization
    • DistributedExecutorService

      public DistributedExecutorService(String name, int instanceNumber, KryoSerializer kryoSerializer, long defaultTimeoutMillis)
      Create a new distributed executor service with a custom default timeout.
      Parameters:
      name - the executor name (must not be blank)
      instanceNumber - the node instance number for logging
      kryoSerializer - the Kryo serializer for task result serialization
      defaultTimeoutMillis - default task timeout in milliseconds (must be > 0)
    • DistributedExecutorService

      public DistributedExecutorService(String name, int instanceNumber, KryoSerializer kryoSerializer, Supplier<Collection<NodeInfo>> memberSupplier)
      Create a new distributed executor service with a member supplier for selector submissions.
      Parameters:
      name - the executor name (must not be blank)
      instanceNumber - the node instance number for logging
      kryoSerializer - the Kryo serializer for task result serialization
      memberSupplier - supplies the current cluster membership snapshot
    • DistributedExecutorService

      public DistributedExecutorService(String name, int instanceNumber, KryoSerializer kryoSerializer, long defaultTimeoutMillis, Supplier<Collection<NodeInfo>> memberSupplier)
      Create a new distributed executor service with custom timeout and member supplier.
      Parameters:
      name - the executor name (must not be blank)
      instanceNumber - the node instance number for logging
      kryoSerializer - the Kryo serializer for task result serialization
      defaultTimeoutMillis - default task timeout in milliseconds (must be > 0)
      memberSupplier - supplies the current cluster membership snapshot
  • Method Details

    • submit

      public <V> String submit(Callable<V> task)
      Submit a callable task for execution on a virtual thread.
      Type Parameters:
      V - the return type of the task
      Parameters:
      task - the callable task to execute (must not be null)
      Returns:
      the unique task ID (UUID string)
      Throws:
      RejectedExecutionException - if the executor has been shut down
    • submitToKey

      public <V> String submitToKey(Callable<V> task, String routingKey)
      Submit a callable task routed to the owner of the specified key.

      The routing key determines which cluster member executes the task, ensuring data locality for key-dependent operations.

      Type Parameters:
      V - the return type of the task
      Parameters:
      task - the callable task to execute (must not be null)
      routingKey - the key whose partition owner should execute the task
      Returns:
      the unique task ID (UUID string)
      Throws:
      RejectedExecutionException - if the executor has been shut down
    • submitToMember

      public <V> String submitToMember(Callable<V> task, String memberId)
      Submit a callable task to a specific cluster member.
      Type Parameters:
      V - the return type of the task
      Parameters:
      task - the callable task to execute (must not be null)
      memberId - the target member ID
      Returns:
      the unique task ID (UUID string)
      Throws:
      RejectedExecutionException - if the executor has been shut down
    • submitToAll

      public <V> Map<String,String> submitToAll(Callable<V> task, Iterable<String> memberIds)
      Submit a callable task to all cluster members (fan-out).

      Returns a map of member ID to task ID. In local-only mode, only the local node executes the task.

      Type Parameters:
      V - the return type of the task
      Parameters:
      task - the callable task to execute (must not be null)
      memberIds - the set of member IDs to fan out to
      Returns:
      map of member ID to task ID
      Throws:
      RejectedExecutionException - if the executor has been shut down
    • submit

      public <V> Map<String,String> submit(Callable<V> task, MemberSelector memberSelector)
      Submit a callable task to members selected from the configured membership snapshot.

      This is the Hazelcast IExecutorService.submit(task, selector)-style SPI: the selector is evaluated against the executor's current live membership supplier and each selected member receives an isolated task instance when fan-out requires it.

      Type Parameters:
      V - the return type of the task
      Parameters:
      task - the callable task to execute (must not be null)
      memberSelector - selects target members from the membership snapshot
      Returns:
      map of selected member ID to submitted task ID
    • submitToMembers

      public <V> Map<String,String> submitToMembers(Callable<V> task, Iterable<NodeInfo> members, MemberSelector memberSelector)
      Submit a callable task to members selected from an explicit membership snapshot.
      Type Parameters:
      V - the return type of the task
      Parameters:
      task - the callable task to execute (must not be null)
      members - candidate members to evaluate
      memberSelector - selects target members from members
      Returns:
      map of selected member ID to submitted task ID
    • cancel

      public boolean cancel(String taskId)
      Cancel a task by its ID.

      If the task is PENDING, it transitions to CANCELLED immediately. If RUNNING, the executing thread is interrupted.

      Parameters:
      taskId - the task ID to cancel
      Returns:
      true if the task was found and cancellation was initiated
    • getStatus

      public @Nullable TaskStatus getStatus(String taskId)
      Get the current status of a task.
      Parameters:
      taskId - the task ID to query
      Returns:
      the task status, or null if no task exists with that ID
    • getTaskInfo

      public @Nullable TaskInfo getTaskInfo(String taskId)
      Get the full task info for a task.
      Parameters:
      taskId - the task ID to query
      Returns:
      the task info, or null if no task exists with that ID
    • getResult

      public <V> @Nullable V getResult(String taskId)
      Get the deserialized result of a completed task.

      Returns null if the task does not exist, has not completed, or completed with a null result.

      Type Parameters:
      V - the expected result type
      Parameters:
      taskId - the task ID to query
      Returns:
      the deserialized result, or null if not available
    • schedule

      public <V> String schedule(Callable<V> task, long delay, TimeUnit unit)
      Schedule a callable to execute once after the given delay.
      Type Parameters:
      V - the return type
      Parameters:
      task - the callable task to execute
      delay - the delay before execution
      unit - the time unit of the delay
      Returns:
      the scheduled task ID
    • scheduleAtFixedRate

      public <V> String scheduleAtFixedRate(Callable<V> task, long initialDelay, long period, TimeUnit unit)
      Schedule a callable to execute repeatedly at a fixed rate.

      The period is measured from the start of each execution. If an execution takes longer than the period, the next execution starts immediately.

      Type Parameters:
      V - the return type
      Parameters:
      task - the callable task to execute
      initialDelay - the delay before the first execution
      period - the period between successive starts
      unit - the time unit for both delay and period
      Returns:
      the scheduled task ID
    • scheduleWithFixedDelay

      public <V> String scheduleWithFixedDelay(Callable<V> task, long initialDelay, long delay, TimeUnit unit)
      Schedule a callable to execute repeatedly with a fixed delay between the end of one execution and the start of the next.
      Type Parameters:
      V - the return type
      Parameters:
      task - the callable task to execute
      initialDelay - the delay before the first execution
      delay - the delay between end of one execution and start of next
      unit - the time unit for both delay values
      Returns:
      the scheduled task ID
    • cancelScheduled

      public boolean cancelScheduled(String taskId)
      Cancel a scheduled task by ID, preventing further executions.
      Parameters:
      taskId - the scheduled task ID
      Returns:
      true if the task was found and cancelled
    • getScheduledTaskInfo

      public @Nullable ScheduledTaskInfo getScheduledTaskInfo(String taskId)
      Get info about a scheduled task.
      Parameters:
      taskId - the scheduled task ID
      Returns:
      the scheduled task info, or null if not found
    • activeScheduledTaskCount

      public int activeScheduledTaskCount()
      Get the count of active scheduled tasks.
      Returns:
      the number of active scheduled tasks
    • submitDurable

      public <V> String submitDurable(Callable<V> task)
      Submit a durable task that is persisted before execution.

      The callable is serialized via Kryo and recorded as a DurableTaskRecord. The task must be a concrete named class — lambdas and anonymous classes cannot be Kryo-serialized. Register the class with KryoSerializer.registerClass().

      In a full Raft-integrated setup, the record is committed to the Raft log before execution begins. On leader change, pending durable tasks are re-executed by calling recoverPendingDurableTasks().

      Type Parameters:
      V - the return type
      Parameters:
      task - the callable to execute durably (must be a registered Kryo class)
      Returns:
      the durable task ID
      Throws:
      IllegalArgumentException - if the callable cannot be serialized
    • recoverPendingDurableTasks

      public int recoverPendingDurableTasks()
      Recover and re-execute pending durable tasks after a leader change or restart.

      Called by the consensus layer when this node becomes the leader. Any durable task in PENDING or RUNNING state is deserialized and re-executed.

      Returns:
      the number of tasks recovered
    • getDurableTaskRecord

      public @Nullable DurableTaskRecord getDurableTaskRecord(String taskId)
      Get the durable task record for a task.
      Parameters:
      taskId - the task ID
      Returns:
      the durable task record, or null if not found
    • pendingDurableTaskCount

      public int pendingDurableTaskCount()
      Get the count of pending durable tasks (needing recovery).
      Returns:
      count of recoverable durable tasks
    • toSnapshot

      public HashMap<String, @Nullable Object> toSnapshot()
      Creates a serializable snapshot of completed/failed task results and metrics.

      Only terminal tasks (COMPLETED, FAILED) are included. Running and pending tasks are abandoned on crash — they cannot be resumed because the in-flight Callable references are not serializable across restarts.

      Returns:
      a HashMap suitable for Kryo serialization within the Raft snapshot
    • restoreFromSnapshot

      public void restoreFromSnapshot(HashMap<String,Object> snapshot)
      Restores task results and metrics from a Raft snapshot.

      Clears all current task state and replaces it with the snapshot contents. Only terminal task results are restored (running/pending tasks are not snapshotted and are therefore lost on crash).

      Parameters:
      snapshot - the snapshot data previously produced by toSnapshot()
    • shutdown

      public void shutdown()
      Shut down this executor service.

      No new tasks will be accepted after shutdown. Running tasks are allowed to complete. The virtual thread pool and timeout scheduler are shut down.

    • isShutdown

      public boolean isShutdown()
      Check whether this executor has been shut down.
      Returns:
      true if shutdown has been called
    • getStats

      public ExecutorStats getStats()
      Get aggregate statistics for this executor.
      Returns:
      immutable snapshot of executor metrics