Ramandeep Singh Nanda
Published

Fri 21 December 2018

←Home

Java WatchService

In this post, I will cover a tutorial that involves different moving pieces. It covers the following:

  • Java WatchService
  • Spring Boot
  • Initialization-on-demand holder idiom
  • Managing concurrency
  • RXJava
  • Lombok (because why type more?)

The example will expose a Spring Boot REST service that exposes csv file records from a directory. In addition, there is a WatchService that monitors the directory for changes, specifically only creation and removal of CSV files.

Let's start with the pieces

  • We want to access the records for a CSV file. So, the first thing we need to do is either search the directory for the csv file or maintain a in memory lookup containing the path of a file. If choosing a in memory lookup, lookups need to be fast, so a HashMap like structure with Map<String,Path> should suffice.
  • But, we need to somehow update the entries in that map after they are added or deleted and this needs to happen concurrently from the watch service, so a better data structure would be a ConcurrentHashMap<String,Path>
  • WatchService needs to run in the background periodically. A RxJava interval stream with Schedulers.io() should suffice.
 Flowable.interval(5, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).forEach(//do something)
  • Initialization-on-demand holder idiom : This is used to for safely creating a singleton instance of our map. This happens because of lazy and sequential guarantees of class initialization.
//Lazy holder idiom for initializing the singleton map
private static class CSVMapping {

  private static final ResourceBundle rb = ResourceBundle.getBundle("app");
  //safe creation
  private static final Map<String, Path> CSVMap = getCSVMap();
  • Reading CSV using RXJava: I covered this in an earlier post.
  • Creating a Spring Boot REST Controller: We just create a REST controller for Spring Boot that checks whether the file name in the GET request exists or not, if it exists we just need to collect the CSV records and return them as Response, or return 404 error.

Putting it all together:

The code below shows the entire service.

@Slf4j
@RestController
public class CSVFileWatcher {

  @GetMapping("/getcsv/{fileName}")
  public ResponseEntity<List<Iterator<String>>> readCSVFile(
      @PathVariable("fileName") String fileName) {
    val dirMap = CSVMapping.CSVMap;
    if (dirMap.containsKey(fileName)) {
      return new ResponseEntity<>(
          CSVUtil.readRecordsFromFile(dirMap.get(fileName), CSVFormat.DEFAULT)
              .map(r -> r.iterator())
              .subscribeOn(Schedulers.io()).toList()
              .blockingGet(),
          HttpStatus.OK);
    } else {
      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
    }
  }

  //Lazy holder idiom for initializing the singleton map
  private static class CSVMapping {

    private static final ResourceBundle rb = ResourceBundle.getBundle("app");
    private static final Map<String, Path> CSVMap = getCSVMap();

    private static boolean isCSV(Path path) {
      try {
        val contentType = Files.probeContentType(path);
        if (contentType != null && contentType.equals("text/csv")) {
          return true;
        }
      } catch (IOException e) {
        log.error("Unable to probe content type", e);
      }
      return false;
    }

    private static void registerFileWatcher(Path path, Map<String, Path> map) {
      try {
        val watchService = FileSystems.getDefault().newWatchService();
        path.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
        Flowable.interval(5, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).forEach(t -> {
          val key = watchService.poll();
          if (key != null) {
            for (WatchEvent event : key.pollEvents()) {
              val kind = event.kind();
              if (kind == OVERFLOW) {
                continue;
              }
              val fileEvent = (WatchEvent<Path>) event;
              Path dir = (Path) key.watchable();
              Path fullPath = dir.resolve(fileEvent.context());
              if (kind == ENTRY_CREATE && isCSV(fullPath)) {
                log.debug("New CSV file detected {}", fullPath.toString());
                map.put(fullPath.getFileName().toString(),
                    fullPath);
              } else if (kind == ENTRY_DELETE && isCSV(fullPath)) {
                log.debug("CSV file {} deleted", fullPath.toString());
                map.remove(fullPath.getFileName().toString());
              }
            }
            key.reset();
          }
        });

      } catch (IOException e) {
        log.error("error occurred", e);
        //ignore or throw
      }
    }

    private static Map<String, Path> getCSVMap() throws RuntimeException {
      final Map CSVMapping = new ConcurrentHashMap<String, Path>();
      val path = Paths.get(rb.getString("directory_loc"));
      try (Stream<Path> files = Files.list(path).map(Path::toAbsolutePath)
          .filter(CSVFileWatcher.CSVMapping::isCSV)) {
        files.parallel()
            .forEach(p -> CSVMapping.put(p.getFileName().toString(), p));
      } catch (IOException e) {
        log.error("Error occurred", e);
        throw new RuntimeException(e);
      }
      registerFileWatcher(path, CSVMapping);
      return CSVMapping;
    }

  }
}
Go Top
comments powered by Disqus