package com.rms.blueprint.data;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataRetrievalWithBackoff implements Runnable
{
public final Logger LOGGER = LoggerFactory.getLogger(DataRetrievalWithBackoff.class);
private final Supplier<Long> pendingSupplier;
private final ObjLongConsumer<Long> readyConsumer;
private final Function<Long, Long> capacitySupplier;
private final long minLoadDurationInSeconds;
private final long capacity;
private final long maxBackoffDelayInSeconds;
/**
* @param config
* Configuration properties
* @param pendingSupplier
* Supplier that tells us how many items is being processed at
* this time
* @param readyConsumer
* Consumer that will be called when data is ready to load
* @param capacitySupplier
* Function to calculate current capacity
*/
private DataRetrievalWithBackoff(final long capacity, final long minLoadDurationInSeconds,
final long maxBackoffDelayInSeconds, final Supplier<Long> pendingSupplier,
final ObjLongConsumer<Long> readyConsumer, final Function<Long, Long> capacitySupplier)
{
Objects.requireNonNull(pendingSupplier);
Objects.requireNonNull(readyConsumer);
Objects.requireNonNull(capacitySupplier);
this.capacity = capacity;
this.minLoadDurationInSeconds = minLoadDurationInSeconds;
this.maxBackoffDelayInSeconds = maxBackoffDelayInSeconds;
this.pendingSupplier = pendingSupplier;
this.readyConsumer = readyConsumer;
this.capacitySupplier = capacitySupplier;
LOGGER.info(String.format("capacity ", capacity));
LOGGER.info(String.format("minLoadDurationInSeconds ", minLoadDurationInSeconds));
LOGGER.info(String.format("maxBackoffDelayInSeconds ", maxBackoffDelayInSeconds));
}
@Override
public void run()
{
LOGGER.info("Running Data Retrieval");
long lastLoadedTime = 0l;
int attempt = 0;
while (true)
{
if (Thread.currentThread().isInterrupted())
{
LOGGER.trace("Interrupted stopping [while]");
break;
}
final long delta = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastLoadedTime);
final long pending = pendingSupplier.get();
final long backoffTime = DataUtil.backoff(attempt,
maxBackoffDelayInSeconds,
minLoadDurationInSeconds / 2.0);
LOGGER.trace("Loading : lastLoaded : {} > {} delta(s) {} pending : {} backoffTime = {}",
new Object[] { lastLoadedTime, new Date(lastLoadedTime), delta, pending, backoffTime });
if (delta >= minLoadDurationInSeconds && pending <= capacitySupplier.apply(capacity))
{
LOGGER.info("Loading : lastLoaded : {} > {} delta(s) {} pending : {}",
new Object[] { lastLoadedTime, new Date(lastLoadedTime), delta, pending });
// let the consumer know that we are ready
readyConsumer.accept(backoffTime, attempt);
if (pending == 0)
++attempt;
else
attempt = 0;
lastLoadedTime = System.currentTimeMillis();
}
else
{
++attempt;
}
try
{
Thread.sleep(TimeUnit.SECONDS.toMillis(backoffTime));
}
catch (final InterruptedException e)
{
LOGGER.trace("Interrupted stopping [sleep]");
Thread.currentThread().interrupt();
break;
}
}
}
public static class Builder
{
private final Function<Long, Long> DEFAULT_CAPACITY_SUPPLIER = (capacity) -> capacity / 2;
private long minLoadDurationInSeconds = 60;
private long capacity = 100;
private long maxBackoffDelayInSeconds = 120;
private Supplier<Long> pendingSupplier;
private ObjLongConsumer<Long> readyConsumer;
private Function<Long, Long> capacitySupplier;
public Builder capacity(final long capacity)
{
this.capacity = capacity;
return this;
}
public Builder maxBackoffDelay(final long duration, final TimeUnit unit)
{
Objects.requireNonNull(unit);
this.maxBackoffDelayInSeconds = unit.toSeconds(duration);
return this;
}
public Builder minLoadDuration(final long duration, final TimeUnit unit)
{
Objects.requireNonNull(unit);
this.minLoadDurationInSeconds = unit.toSeconds(duration);
return this;
}
public Builder readyConsumer(final ObjLongConsumer<Long> readyConsumer)
{
Objects.requireNonNull(readyConsumer);
this.readyConsumer = readyConsumer;
return this;
}
public Builder capacitySupplier(final Function<Long, Long> capacitySupplier)
{
Objects.requireNonNull(capacitySupplier);
this.capacitySupplier = capacitySupplier;
return this;
}
public Builder pendingSupplier(final Supplier<Long> pendingSupplier)
{
Objects.requireNonNull(capacitySupplier);
this.pendingSupplier = pendingSupplier;
return this;
}
public DataRetrievalWithBackoff build()
{
// check invariant
Objects.requireNonNull(pendingSupplier, "Pening items supplier not provided");
Objects.requireNonNull(readyConsumer, "Ready Consumer not provided");
if (capacitySupplier == null)
capacitySupplier = DEFAULT_CAPACITY_SUPPLIER;
return new DataRetrievalWithBackoff(capacity,
minLoadDurationInSeconds,
maxBackoffDelayInSeconds,
pendingSupplier,
readyConsumer,
capacitySupplier == null ? DEFAULT_CAPACITY_SUPPLIER
: capacitySupplier);
}
}
} |