One big downside of using Cascading for our applications has been the runtime of our regression test suite. We test with quantities of data nowhere near our regular production volume, but we still end up running lots of jobs. In our experience, this ends up making our tests take a long time (in the tens of minutes), killing our ability to iterate quickly.
After looking more deeply into the issue, we discovered that it all came down to one particular polling interval. When Cascading launches a Flow, it will launch one or more actual Hadoop jobs and then wait until each completes before launching the next in the pipeline. The problem is that the amount of time Cascading waits between checking job completion is set to something like 5 seconds by default! This setting makes plenty of sense in the case of real world jobs, which should all be at least minutes in length – 5 seconds isn’t going to make a difference one way or another. However, in our ultra-short job scenario, this makes all the difference. If your Flow works out to 10 jobs that run serially, the fastest it could complete is 50 seconds.
Initially, we customized Cascading 1.0.8 to reduce this wait time down to about 100 milliseconds. However, when we recently upgraded to Cascading 1.1, we were pleasantly surprised to find that this polling interval was now configurable. Generally, it looks something like this:
Map properties = new HashMap();
properties.put("cascading.flow.job.pollinginterval", 100);
new FlowConnector(properties).connect(...).complete();
With a convenient way to change this parameter, the only other thing we need is a convenient way to set this value environmentally. Ideally, we’d like to leave the parameter alone during production runs and only set it low during our test suite. This is a little tricky because, unlike Hadoop, Cascading itself doesn’t provide any global configuration framework.
The solution we ended up going with was to provide a class with a static method for getting new FlowConnectors that replaces the standard constructor. This method allows the user to provide any options they need and merges in whatever the current environmental polling interval should be. It looks something like this:
public final class CascadingHelper {
public static final Map DEFAULT_PROPERTIES = new HashMap();
private CascadingHelper() {}
public static FlowConnector getFlowConnector() {
return new FlowConnector(DEFAULT_PROPERTIES);
}
public static FlowConnector getFlowConnector(Map properties) {
Map combined = new HashMap();
for (Map.Entry entry : DEFAULT_PROPERTIES.entrySet()) {
combined.put(entry.getKey(), entry.getValue());
}
for (Map.Entry entry : properties.entrySet()) {
combined.put(entry.getKey(), entry.getValue());
}
return new FlowConnector(combined);
}
}
Finally, in our test suite, we made all of our tests inherit from a common base test class that reconfigures the default polling interval in the class constructor. Voila, our test suite takes 40% less time!
The only downside we’re currently presented with is that if a user forgets to use either the FlowConnector provider or the test base class, then the Flows in the tests run slowly, so constant vigilance is required. Still, making this change has caused our build to run somewhere between 2x and 3x as fast, which is just plain awesome.
Follow Bryan on Twitter: @bryanduxbury








One Comment
Thanks for
your sharing, it’s very useful