odiststream(2) distributed interface for large data streams

Other Alias

idiststream

DESCRIPTION

This class provides a parallel and distributed stream interface for large data management. File decompresion is assumed using gzip and a recursive seach in a directory list is provided for input.

     odiststream foo("NAME", "suffix");

is similar

     ofstream foo("NAME.suffix").

The main difference is that writting to ofstream is executed on all processes while odiststream manage nicely the multi-process environment. For convenience, the standard streams cin, cout and cerr are extended to din, dout and derr. Notice that, as orheostream (see rheostream(2)), if NAME does not end with `.suffix', then `.suffix' is automatically added. By default, compression is performed on the fly with gzip, adding an additional `.gz' suffix. The flush action is nicely handled in compression mode:

     foo.flush();

This feature allows intermediate results to be available during long computations. The compression can be deactivated while opening a file by an optional argument:

     odiststream foo("NAME", "suffix", io::nogz);

An existing compressed file can be reopen in append mode: new results will be appended at the end of an existing file:

     odiststream foo("NAME", "suffix", io::app);

Conversely,

        irheostream foo("NAME","suffix");

is similar to

        ifstream foo("NAME.suffix").

Also idiststream manage nicely the multi-process environment. However, we look at a search path environment variable RHEOPATH in order to find NAME while suffix is assumed. Moreover, gzip compressed files, ending with the `.gz' suffix is assumed, and decompression is done.

IMPLEMENTATION

class odiststream {
public:
  typedef std::size_t size_type;
// allocators/deallocators:
  odiststream();
  odiststream (std::string filename, std::string suffix = "",
              io::mode_type mode = io::out, const communicator& comm = communicator());
  odiststream (std::string filename,
              io::mode_type mode, const communicator& comm = communicator());
  odiststream (std::string filename, std::string suffix, const communicator& comm);
  odiststream (std::string filename, const communicator& comm);
  odiststream(std::ostream& os, const communicator& comm = communicator());
  ~odiststream();
// modifiers:
   void open (std::string filename, std::string suffix = "",
             io::mode_type mode = io::out, const communicator& comm = communicator());
   void open (std::string filename,
             io::mode_type mode, const communicator& comm = communicator());
   void open (std::string filename, std::string suffix,
             const communicator& comm);
   void open (std::string filename, const communicator& comm);
   void flush();
   void close();
// accessors:
   const communicator& comm() const { return _comm; }
   bool good() const;
   operator bool() const { return good(); }
   static size_type io_proc();
// internals:
   std::ostream& os();
   bool nop();
protected:
// data:
   std::ostream* _ptr_os;
   bool          _use_alloc;
   communicator  _comm;
};

IMPLEMENTATION

class idiststream {
public:
  typedef std::size_t size_type;
// allocators/deallocators:
  idiststream();
  idiststream (std::istream& is, const communicator& comm = communicator());
  idiststream (std::string filename, std::string suffix = "",
             const communicator& comm = communicator());
  ~idiststream();
// modifiers:
  void open (std::string filename, std::string suffix = "",
             const communicator& comm = communicator());
  void close();
// accessors:
  const communicator& comm() const { return _comm; }
  bool good() const;
  operator bool() const { return good(); }
  static size_type io_proc();
// internals:
  std::istream& is();
  bool nop();
  bool do_load();
protected:
// data:
  std::istream* _ptr_is;
  bool          _use_alloc;
  communicator  _comm;
};