Write your first batch
Writing a batch is mainly about writing a main(String...) method which uses Yupiik Batch compoennts.
Create a batch
Implementing a batch is about extending Batch interface:
public class MyBatch implements Batch<MyConf> {
@Override
public void accept(final MyConf configuration) {
// ...
}
}
What is important to note is that a batch has a configuration and the configuration is "injected" in accept method. This enables the framework to map the main arguments to the configuration in an unified fashion.
Once you have a batch implementation, you can run it using Batch launcher method:
public static void main(final String... args) {
Batch.run(MyBatch.class, args);
}
Define your batch configuration
A batch configuration is a class with field(s) decorated with @Param:
public class DataSourceConfiguration {
@Param(description = "Driver to use", required = true)
private String driver;
@Param(description = "JDBC URL to use", required = true)
private String url;
@Param(description = "Database username.")
private String username;
@Param(description = "Database password.")
private String password;
}
|
NOTE
|
fields are injected so they shouldn't be |
|
TIP
|
|
|
TIP
|
|
|
TIP
|
passing |
Reusable batch components
Reusable components are in io.yupiik.batch.runtime.component package. This section highlights a few of them.
io.yupiik.batch.runtime.component.AcceptedLossDiffFilter
Enables to filter a Diff. If the acceptedLoss is not reached, i.e. more row would be deleted than this percentage, the chain will end there.
Goal is to not delete a database if incoming data are corrupted or not properly up to date.
io.yupiik.batch.runtime.component.DatasetDiffComputer
Component which takes as input two iterators representing sorted datasets.
Both dataset will be compared - in streaming mode - using the Comparator passed in the constructor/factory method. It enables to detect deletions and additions and it will be reflected in the resulting Diff instance.
If equals, the BiPredicate will be used to check it is actually equal or not. If not the data will be considered updated, otherwise not changed and ignored from the diff.
io.yupiik.batch.runtime.component.DiffExecutor
Enables to apply a Diff - from a DatasetDiffComputer.
It will apply it in a database represented by the connectionSupplier with the provided commitInterval. The statements are creating using the related factories - insertFactory, updateFactory, deleteFactory.
Finally, dryRun toggle enables to simulate the processing without issuing any modification in the database.
io.yupiik.batch.runtime.component.Mapper
This mapping component enables to convert an input to an output instance by providing an specification instance.
It is a class decorated with @Mapping:
@Mapping(
from = IncomingModel.class,
to = OutputModel.class,
documentation = "Converts an input to an output.",
properties = {
@Property(type = CONSTANT, to = "outputFieldUrl", value = "https://foo.bar/"),
@Property(type = TABLE_MAPPING, from = "inputKeyField", to = "mappedOutput", value = "myLookupTable", onMissedTableLookup = FORWARD)
},
tables = {
@Mapping.MappingTable(
name = "myLookupTable",
entries = {
@Entry(input = "A", output = "1"),
@Entry(input = "C", output = "3")
}
)
})
public class MyMapperSpec {
@Mapping.Custom(description = "This will map X to Y.")
String outputField(final IncomingModel in[, @Table("myLookupTable") final Map<String, String> myLookupTable) {
return ...;
}
}
To get a mapper, you simply call Mapper.mapper(MyMapperSpec.class) and then can insert this mapper in any BatchChain.
The specification API enables static mapping (properties) or custom mapping - @Mapping.Custom - for more advanced logic.
The companion class io.yupiik.batch.runtime.documentation.MapperDocGenerator enables to generate an asciidoctor documentation for a mapper class.
io.yupiik.batch.runtime.component.SQLQuery
Enables to extract data from a SQL query.
A custom mapper will be called for each ResultSet line to convert current row in an object passed to the rest of the BatchChain.
io.yupiik.batch.runtime.component.uship.DatabaseDiffExecutor
Enables to apply a Diff - from a DatasetDiffComputer when the entities (diff model) are Yupiik UShip Persistence models.
It will apply it in a database with the provided commitInterval.
dryRun toggle enables to simulate the processing without issuing any modification in the database.
Combine components
Combining components can be done in a plain main:
final var referenceData = new ReferenceDataSQLQuery(databaseConnection);
final var newInputs = new MyNewInputs();
final var comparingProcessor = new ReferencialRowDatasetDiffComputer();
final var diff = comparingProcessor.apply(referenceData, newInputs);
if (new AcceptedLossDiffFilter(0.10).test(diff)) {
new ReferenceDataDiffExecutor(databaseConnection, 25).accept(diff);
}
However, for too noisy cases, it can be neat to use a fluent API on the diff to make it more readable and composable. Indeed you can use the Stream or Optional API:
// init
final var diff = new ReferencialRowDatasetDiffComputer()
.apply(new ReferenceDataSQLQuery(databaseConnection), new MyNewInputs());
// start the flow
Stream.of(diff)
.filter(new AcceptedLossDiffFilter(0.10))
.forEach(new ReferenceDataDiffExecutor(databaseConnection, 25));
// or
Optional.of(diff)
.filter(new AcceptedLossDiffFilter(0.10))
.ifPresent(new ReferenceDataDiffExecutor(databaseConnection, 25));
This enables to read more explicitly the flow of processing thanks Stream or Optional fluent API. It is also now easier to insert an element or decorate components explicitly.
However, these two API are not designed for that and will quickly hit some limitation. To make it more batch oriented, parent Batch class enables to define a stream like flow but more batch oriented. You have to start your flow by from() or use a specific source such as DatasetDiffComputer. For example:
@Override
public void accept(final MyConfiguration configuration) {
final var connectionProvider = configuration.datasource.toConnectionProvider();
referencialRowDatasetDiff()
.withCustomData(myInput())
.withReferencialData(referenceData(datasource, configuration.table))
.diff()
.filter(withAcceptedLoss(configuration.acceptedLoss))
.then(applyDiff(connectionProvider, configuration.commitInterval, configuration.dryRun, configuration.table))
.run(runConfiguration(datasource, getClass().getSimpleName(), systemUTC()));
}
This DSL is more friendly to the batches we write (integrating with default components).
|
IMPORTANT
|
until you hit |
|
TIP
|
some components have a static factory to make it more expressive, don't hesitate to use it. |
Finally, the RunConfiguration enables to intercept any step of the BatchChain defined by the previous DSL. Combined with ExecutionTracer, it will let you store any execution and its steps in a database for audit or monitoring/administration purposes.
Async result
It can be neat to pass a step and its result to next step without it being finished.
It is often the case for reactive bacthes (one "thread" starts to poll data, next step processes it etc.. but you want to keep the polling and processing split in terms of "step" and tracing).
To do that, you can return a BatchPromise which is just a holder of a value (reactive in our example) and a CompletionStage which notifies the batch runtime and tracer when the step is done:
from()
.map("step1", new CommentifiableFunction<Void, BatchPromise<String>>() {
@Override
public BatchPromise<String> apply(final Void o) {
final var reactiveComponent = runStep1(); // an Observable with RxJava for example
return BatchPromise.of(reactiveComponent::onItem, reactiveComponent.toCompletionStage());
}
})
.map("step2", new CommentifiableConsumer<BatchPromise<Void>>() {
@Override
public BatchPromise<Void> apply(final BatchPromise<Observable> in) {
final var promise = new CompletableFuture<Void>();
in.subscribe(
this::doStep2ItemProcessing,
error -> {
// log etc...
promise.completeExceptionally(error);
},
() -> promise.complete(null));
return BatchPromise.of(null, promise);
}
})
.run(tracingConfig);
So overall the step1 starts to read some data and emiiting it when step2 starts and subscribes to it.
When step1 is done it notifies the batch runtime it is ok and the tracer (or runtime) will stop the thread 1 monitoring.
Step2 being asynchronous too (due to its reactive nature, it also emits a BatchPromise leading to the same kind of behavior).
|
TIP
|
thanks to this trick, you can run a concurrent job with a flat chain since you can pass in the |
Reusable Iterators
Reusable iterators are either provided through FluentIterator or extensions (in this case you must add a dependency to get it).
io.yupiik.batch.iterator.excel.component.ExcelIterator
Reads an excel file sheet row by row.
Factories
<A> ExcelIterator<A> ExcelIterator.of(Path,int,RowMapper<A>)
Creates a new ExcelIterator with a custom row mapper.
ExcelIterator<List<String>> ExcelIterator.ofLines(Path,int)
Creates a new ExcelIterator with a default row mapper mapping lines as List<String>.
Dependency
<dependency>
<groupId>io.yupiik.batch</groupId>
<artifactId>excel-iterator</artifactId>
<version>${yupiik-batch.version}</version>
</dependency>
TIP
-
Use
FluentIteratortofilterandmapyour inputIterator, it makes the code more readable and adds some featuresStreamdoes not have like a more advanceddistinctimplementation.
Here is a sample input iterator written using FluentIterator and filtering its data with some business rule on the raw input and post processing the mapped data with PspRolePasswordInjector.
final var iterator = FluentIterator.of(myIterator()) (1)
.filter(new MyBusinessFilter()) (2)
.map(new MyBusinessMapper()) (3)
.unwrap(); (4)
-
Create a
FluentIteratorfrommyIterator()result, -
Filter the iterator with a
Predicate, - Map the iterator data to another model with a `Mapper>,
-
Remove the enclosing
FluentIteratorwrapper which is not needed once the full iterator chain is defined (optional).