Ramandeep Singh Nanda
Published

Sat 15 December 2018

←Home

Reactively Streaming CSV using RXJava

RXJava is an extremely useful streaming framework (here is an example application using it for parallel processing of restful calls to both uber and lyft (RT_UBER_NYC_TAXI)). However, In this post, I will cover how you can reactively stream and process a CSV file.

Firstly, you can create a Flowable of CSVRecord (commons-csv) by converting iterator to Flowable using the call Flowable.fromIterable(). Next, we want this to be safe resource usage i.e. we don't want to leave open file handles, so we use the resource safe Flowable.using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceDisposer) method call, where the last argument is a resource disposer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public static Flowable<CSVRecord> readRecordsFromFile(Path inputFile,CSVFormat csvFormat) {
   return Flowable.using(() -> Files.newBufferedReader(inputFile),
       bufferedReader -> csvRecordFlowable(bufferedReader, csvFormat.withHeader()),
       BufferedReader::close
   );
 }

 private static Flowable<CSVRecord> csvRecordFlowable(BufferedReader br, CSVFormat csvFormat) {
   try{
     final CSVParser csvParser = new CSVParser(br,csvFormat);
     return Flowable.fromIterable(() -> csvParser.iterator());
   } catch (IOException e){
     throw new RuntimeException(e);
   }
 }

This nicely sets up a Flowable<CSVRecord> which can then be processed in different ways and you get all the Flowable features like backpressure, etc. Example usage mentioned below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
readRecordsFromFile(Paths.get("sample.csv")).parallel().
 runOn(Schedulers.io()).filter(/*filter here*/).map(/*map here*/).sequential().subscribe(new Subscriber<CSVRecord>(){
   Subscription sub = null;

   @Override
   public void onSubscribe(Subscription s){
     sub = s;
   }
   @Override
   public void onNext(CSVRecord record){
     //do something here
     //request next item
     sub.request(1)
   }
   @Override
   public void onError(Throwable t){
     //handle error
   }
   @Override
   public void onComplete(){

   }
 })
Go Top
comments powered by Disqus