On a project I’m currently working on we ran into the situation that we had to pick up files from an endpoint, but that the files could have dependencies on/relations with each other. Of course this is not an ideal situation, it would be more desirable to be able to process each file independently. Unfortunately the system responsible for delivering the data was not capable of providing it in a different way.
An example of these files can be found below. As you can see file-3.txt needs file-1.txt to be processed beforehand. After file-3.txt, file-4.txt can be processed, and so on. The last file, file-6.txt should always be processed last, because (indirectly) it depends on all other files.
In the above example it still looks pretty simple of course. We could just make a static list where we would specify in which order we would process the files. However, in our situation it were a lot more files. This would mean that keeping this list up to date would get real complex real fast, especially when files have multiple (nested) dependencies.
Because of this complexity, we decided to make the application responsible for determining in which order the files should be processed.
This is possible with a topological sorting algorithm. Using this algorithm it’s possible to use a directed acyclic graph (DAG) to indicate precedence among things (in our case: files). A lot of articles can be found about topological sorting algorithms and various ways to implement them. I found the Topological Sort of Directed Acyclic Graph article from Baeldung really helpful.
From this article, we used “Algorithm 3: DFS Algorithm for Topological Sort” as our main inspiration. The only difference is that, besides the “visited” collection, we also keep track of the elements that we are currently visiting. This way we can detect cyclic dependencies.
In our case, all file types have been defined in classes that implement the FileWithDependencies interface. Via this interface, it’s possible to specify which of the other files that type is dependent on.
private static Deque<Class<? extends FileWithDependencies>> topologicalSort(Set<Class<? extends FileWithDependencies>> files) {
Deque<Class<? extends FileWithDependencies>> deque = new ArrayDeque<>();
Set<Class<? extends FileWithDependencies>> visiting = new HashSet<>();
Set<Class<? extends FileWithDependencies>> visited = new HashSet<>();
for (var current : files) {
if (!visited.contains(current)) {
topologicalSortRecursive(current, visiting, visited, deque);
}
}
return deque;
}
private static void topologicalSortRecursive(Class<? extends FileWithDependencies> current,
Set<Class<? extends FileWithDependencies>> visiting,
Set<Class<? extends FileWithDependencies>> visited,
Deque<Class<? extends FileWithDependencies>> deque) {
if (visiting.contains(current)) {
throw new RuntimeException("Cyclic dependency detected");
}
visiting.add(current);
for (var dependency : FileWithDependencies.retrieveDependencies(current)) {
if (!visited.contains(dependency)) {
topologicalSortRecursive(dependency, visiting, visited, deque);
}
}
deque.addLast(current);
visited.add(current);
visiting.remove(current);
}
In Camel you can provide a Comparator to the sorter parameter on file based endpoints. Now we have the possibility to sort our files and dependencies using the topological sorting algorithm, the implementation of that Comparator is fairly easy.
We will use the sorted list to assign “importance” to a file name. In the actual compare step, we can just compare these “importance” values:
public class TopologicalSorter implements Comparator<GenericFile<Object>> {
private final Map<String, Integer> fileImportance;
public TopologicalSorter() {
Set<Class<? extends FileWithDependencies>> unsorted = FileWithDependencies.FILE_TYPES;
fileImportance = Collections.unmodifiableMap(constructFileImportance(unsorted));
}
@Override
public int compare(GenericFile<Object> file1, GenericFile<Object> file2) {
return Integer.compare(getImportance(file1), getImportance(file2));
}
private int getImportance(GenericFile<Object> file) {
return fileImportance.getOrDefault(file.getFileName(), -1);
}
private static Map<String, Integer> constructFileImportance(Set<Class<? extends FileWithDependencies>> unsorted) {
var sorted = topologicalSort(unsorted);
var sortedSize = sorted.size();
Map<String, Integer> result = new LinkedHashMap<>(sortedSize);
for (var i = 0; i < sortedSize; i++) {
var file = sorted.removeFirst();
var fileName = FileWithDependencies.retrieveFileName(file);
result.put(fileName, i);
}
return result;
}
// ... The topological sort logic from the previous examples ...
}
Now that we have the Comparator in place, we can supply it to a file based endpoint via the sorter parameter:
public class TopologicalSortRouteBuilder extends RouteBuilder {
@Override
public void configure() {
String includeFilter = FileWithDependencies.FILE_TYPES.stream()
.map(FileWithDependencies::retrieveFileName)
.collect(Collectors.joining(","));
from("file:/tmp?antInclude=" + includeFilter + "&doneFileName=done&sorter=#topologicalSorter")
.log("Processed ${headers.CamelFileName}");
}
}
The full code is available on the Whitehorses Bitbucket - here. Besides the working code, it also contains a unit test that you can use to test the feature/see the feature in action.
Please note that if you run the unit test a couple of times, you might see various different orders in which the files are being processed in the log file. This is the case because there are multiple “correct” orders (graphs) in which the files can be processed. Some examples:
Even though we got this logic working; there are still quite a few snags that you need to keep in mind when implementing similar logic.
Researching the topological sorting algorithms and implementing the sorter to process dependent files in Apache Camel was a fun thing to do. In the end we got it working for a solution that meets our needs.
However, there are a lot of things to keep in mind while implementing a solution like this. Especially in a bit more complex environment where applications should be ran replicated, or where more complex error handling flows need to be implemented.
So still, my advice would be to check if the upstream system is capable of providing the data in a way that it can be processed completely separated from each other. But if this is not the case, maybe this article could help as a guide in the right direction!
Geen reacties
Geef jouw mening
Reactie plaatsenReactie toevoegen