Data retrieval service with exponential backoff

Here we will create Data retrieval service with exponential backoff that we covered in the previous post.

Implementation

 
package com.rms.blueprint.data;
 
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;
import java.util.function.Supplier;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class DataRetrievalWithBackoff implements Runnable
{
    public final Logger LOGGER = LoggerFactory.getLogger(DataRetrievalWithBackoff.class);
 
    private final Supplier<Long> pendingSupplier;
 
    private final ObjLongConsumer<Long> readyConsumer;
 
    private final Function<Long, Long> capacitySupplier;
 
    private final long minLoadDurationInSeconds;
 
    private final long capacity;
 
    private final long maxBackoffDelayInSeconds;
 
    /**
     * @param config
     *            Configuration properties
     * @param pendingSupplier
     *            Supplier that tells us how many items is being processed at
     *            this time
     * @param readyConsumer
     *            Consumer that will be called when data is ready to load
     * @param capacitySupplier
     *            Function to calculate current capacity
     */
    private DataRetrievalWithBackoff(final long capacity, final long minLoadDurationInSeconds,
            final long maxBackoffDelayInSeconds, final Supplier<Long> pendingSupplier,
            final ObjLongConsumer<Long> readyConsumer, final Function<Long, Long> capacitySupplier)
    {
        Objects.requireNonNull(pendingSupplier);
        Objects.requireNonNull(readyConsumer);
        Objects.requireNonNull(capacitySupplier);
 
        this.capacity = capacity;
        this.minLoadDurationInSeconds = minLoadDurationInSeconds;
        this.maxBackoffDelayInSeconds = maxBackoffDelayInSeconds;
 
        this.pendingSupplier = pendingSupplier;
        this.readyConsumer = readyConsumer;
        this.capacitySupplier = capacitySupplier;
 
        LOGGER.info(String.format("capacity ", capacity));
        LOGGER.info(String.format("minLoadDurationInSeconds ", minLoadDurationInSeconds));
        LOGGER.info(String.format("maxBackoffDelayInSeconds ", maxBackoffDelayInSeconds));
 
    }
 
    @Override
    public void run()
    {
        LOGGER.info("Running Data Retrieval");
 
        long lastLoadedTime = 0l;
        int attempt = 0;
 
        while (true)
        {
            if (Thread.currentThread().isInterrupted())
            {
                LOGGER.trace("Interrupted stopping [while]");
                break;
            }
 
            final long delta = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastLoadedTime);
            final long pending = pendingSupplier.get();
 
            final long backoffTime = DataUtil.backoff(attempt,
                                                      maxBackoffDelayInSeconds,
                                                      minLoadDurationInSeconds / 2.0);
 
            LOGGER.trace("Loading : lastLoaded : {} > {}  delta(s) {} pending : {} backoffTime  = {}",
                         new Object[] { lastLoadedTime, new Date(lastLoadedTime), delta, pending, backoffTime });
 
            if (delta >= minLoadDurationInSeconds && pending <= capacitySupplier.apply(capacity))
            {
                LOGGER.info("Loading : lastLoaded :  {} >  {}  delta(s) {} pending : {}",
                            new Object[] { lastLoadedTime, new Date(lastLoadedTime), delta, pending });
 
                // let the consumer know that we are ready
                readyConsumer.accept(backoffTime, attempt);
 
                if (pending == 0)
                    ++attempt;
                else
                    attempt = 0;
 
                lastLoadedTime = System.currentTimeMillis();
            }
            else
            {
                ++attempt;
            }
 
            try
            {
 
                Thread.sleep(TimeUnit.SECONDS.toMillis(backoffTime));
            }
            catch (final InterruptedException e)
            {
                LOGGER.trace("Interrupted stopping  [sleep]");
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
 
    public static class Builder
    {
        private final Function<Long, Long> DEFAULT_CAPACITY_SUPPLIER = (capacity) -> capacity / 2;
 
        private long minLoadDurationInSeconds = 60;
 
        private long capacity = 100;
 
        private long maxBackoffDelayInSeconds = 120;
 
        private Supplier<Long> pendingSupplier;
 
        private ObjLongConsumer<Long> readyConsumer;
 
        private Function<Long, Long> capacitySupplier;
 
        public Builder capacity(final long capacity)
        {
            this.capacity = capacity;
            return this;
        }
 
        public Builder maxBackoffDelay(final long duration, final TimeUnit unit)
        {
            Objects.requireNonNull(unit);
            this.maxBackoffDelayInSeconds = unit.toSeconds(duration);
            return this;
        }
 
        public Builder minLoadDuration(final long duration, final TimeUnit unit)
        {
            Objects.requireNonNull(unit);
            this.minLoadDurationInSeconds = unit.toSeconds(duration);
            return this;
        }
 
        public Builder readyConsumer(final ObjLongConsumer<Long> readyConsumer)
        {
            Objects.requireNonNull(readyConsumer);
            this.readyConsumer = readyConsumer;
            return this;
        }
 
        public Builder capacitySupplier(final Function<Long, Long> capacitySupplier)
        {
            Objects.requireNonNull(capacitySupplier);
            this.capacitySupplier = capacitySupplier;
            return this;
        }
 
        public Builder pendingSupplier(final Supplier<Long> pendingSupplier)
        {
            Objects.requireNonNull(capacitySupplier);
            this.pendingSupplier = pendingSupplier;
            return this;
        }
 
        public DataRetrievalWithBackoff build()
        {
            // check invariant
            Objects.requireNonNull(pendingSupplier, "Pening items supplier not provided");
            Objects.requireNonNull(readyConsumer, "Ready Consumer not provided");
 
            if (capacitySupplier == null)
                capacitySupplier = DEFAULT_CAPACITY_SUPPLIER;
 
            return new DataRetrievalWithBackoff(capacity,
                                                minLoadDurationInSeconds,
                                                maxBackoffDelayInSeconds,
                                                pendingSupplier,
                                                readyConsumer,
                                                capacitySupplier == null ? DEFAULT_CAPACITY_SUPPLIER
                                                    : capacitySupplier);
        }
    }
}

Usage

 
   //@formatter:off
            final DataRetrievalWithBackoff service = new DataRetrievalWithBackoff.Builder()
                    .capacity(1000)
                    .maxBackoffDelay(100, TimeUnit.SECONDS)
                    .minLoadDuration(10, TimeUnit.SECONDS)
                    .pendingSupplier(() -> getNumberOfPendingItemsToProcess())
                    .readyConsumer((time, attempt) -> fire(new DataLoadEvent()))
                .build(); 
           //@formatter:on

Leave a Comment

Your email address will not be published. Required fields are marked *