PURIFY
Next-generation radio interferometric imaging
h5reader.h
Go to the documentation of this file.
1 #ifndef PURIFY_H5READER_H
2 #define PURIFY_H5READER_H
3 #include "purify/config.h"
4 #include "purify/types.h"
5 #include "purify/logging.h"
6 #include "purify/uvw_utilities.h"
7 
8 #ifdef PURIFY_MPI
9 #include <mpi.h>
10 #include <sopt/mpi/communicator.h>
11 #endif
12 
13 #include "highfive/H5File.hpp"
14 
15 #include <algorithm>
16 #include <map>
17 #include <memory>
18 #include <random>
19 #include <string>
20 #include <vector>
21 
22 #ifndef PURIFY_MPI
23 namespace sopt::mpi {
24 class Communicator;
25 }
26 #endif
27 
28 namespace purify::H5 {
29 
30 #ifdef PURIFY_MPI
31 
32 HighFive::FileAccessProps MPIFileAccess() {
33  HighFive::FileAccessProps fap;
34  fap.add(HighFive::MPIOFileAccess{MPI_COMM_WORLD, MPI_INFO_NULL});
35  fap.add(HighFive::MPIOCollectiveMetadata{});
36  return fap;
37 }
38 
39 HighFive::DataTransferProps MPIDataTransfer() {
40  HighFive::DataTransferProps dtp;
41  dtp.add(HighFive::UseCollectiveIO{});
42  return dtp;
43 }
44 
45 #endif
46 
48 class H5Handler {
49  using DatsetMap = std::map<std::string, HighFive::DataSet>;
50 
51  public:
52  H5Handler() = delete;
53 
55  H5Handler(const std::string& filename)
56  : _comm(nullptr),
57  _fap(HighFive::FileAccessProps{}),
58  _dtp(HighFive::DataTransferProps{}),
59  _file(filename, HighFive::File::ReadOnly) {
60  _datalen = _slicepos = _slicelen = _batchpos = 0;
61  }
62 
63 #ifdef PURIFY_MPI
65  H5Handler(const std::string& filename, const sopt::mpi::Communicator& comm)
66  : _comm(&comm),
67  _fap(MPIFileAccess()),
68  _dtp(MPIDataTransfer()),
69  _file(filename, HighFive::File::ReadOnly, _fap) {
70  _datalen = _slicepos = _slicelen = _batchpos = 0;
71  }
72 #endif
73 
75  template <typename T = double>
76  std::vector<T> read(const std::string& label) const {
77  auto dataset = _file.getDataSet(label);
78  return dataset.read<std::vector<T>>();
79  }
80 
83  template <typename T = double>
84  std::vector<T> distread(const std::string& label) {
85  if (!_comm) throw std::runtime_error("No MPI-collective reading enabled!");
86  _loadDataSet(label);
87  std::vector<T> data;
88  data.reserve(_slicelen);
89  _ds[label].select({_slicepos}, {_slicelen}).read(data, _dtp);
90  return data;
91  }
92 
95  template <typename T = double>
96  std::vector<T> stochread(const std::string& label, size_t batchsize, bool shuffle = false) {
97  if (!_comm) throw std::runtime_error("No MPI-collective reading enabled!");
98 
99  _loadDataSet(label);
100  if (shuffle) _shuffle();
101 
102  std::vector<T> data;
103  data.reserve(batchsize);
104  // account for wrap around near
105  // the edges of the slice
106  size_t pos = _batchpos;
107  while (batchsize) {
108  std::vector<T> tmp;
109  size_t len = std::min(batchsize, _slicepos + _slicelen - pos);
110  _ds[label].select({pos}, {len}).read(tmp, _dtp);
111  data.insert(data.end(), std::make_move_iterator(std::begin(tmp)),
112  std::make_move_iterator(std::end(tmp)));
113  pos = _slicepos;
114  batchsize -= len;
115  }
116  return data;
117  }
118 
119  private:
120  void _loadDataSet(const std::string& label) {
121 #ifdef PURIFY_MPI
122  if (_ds.find(label) != _ds.end()) return;
123 
124  _ds[label] = std::move(_file.getDataSet(label));
125  const auto& dims = _ds[label].getDimensions();
126  size_t len = dims.at(0);
127  if (len == 0) throw std::runtime_error("Dataset has zero length!");
128  if (len < _comm->size()) throw std::runtime_error("Not enough data for each MPI rank!");
129  if (_datalen == 0) {
130  _datalen = len;
131  // Determine starting position and length of the data slice
132  _slicelen = _datalen / _comm->size();
133  _slicepos = _comm->rank() * _slicelen;
134  if (_comm->rank() == _comm->size() - 1) {
135  _slicelen += _datalen % _comm->size();
136  }
137  } else if (len != _datalen) {
138  throw std::runtime_error("Inconsistent dataset length!");
139  }
140 #endif
141  }
142 
143  void _shuffle() {
144  std::uniform_int_distribution<size_t> uni(_slicepos, _slicepos + _slicelen - 1);
145  _batchpos = uni(_rng);
146  }
147 
148  const sopt::mpi::Communicator* _comm;
149 
150  const HighFive::FileAccessProps _fap;
151 
152  const HighFive::DataTransferProps _dtp;
153 
154  const HighFive::File _file;
155 
156  DatsetMap _ds;
157 
158  std::mt19937 _rng;
159 
160  size_t _datalen, _slicepos, _slicelen, _batchpos;
161 };
162 
166 utilities::vis_params read_visibility(const std::string& vis_name, const bool w_term) {
167  H5Handler vis_file(vis_name);
168  utilities::vis_params uv_vis;
169 
170  std::vector<t_real> utemp = vis_file.read<t_real>("u");
171  uv_vis.u = Eigen::Map<Vector<t_real>>(utemp.data(), utemp.size(), 1);
172 
173  // found that a reflection is needed for the orientation
174  // of the gridded image to be correct
175  std::vector<t_real> vtemp = vis_file.read<t_real>("v");
176  uv_vis.v = -Eigen::Map<Vector<t_real>>(vtemp.data(), vtemp.size(), 1);
177 
178  if (w_term) {
179  std::vector<t_real> wtemp = vis_file.read<t_real>("w");
180  uv_vis.w = Eigen::Map<Vector<t_real>>(wtemp.data(), wtemp.size(), 1);
181  } else {
182  uv_vis.w = Vector<t_real>::Zero(utemp.size());
183  }
184 
185  std::vector<t_real> retemp = vis_file.read<t_real>("re");
186  std::vector<t_real> imtemp = vis_file.read<t_real>("im");
187  std::vector<t_real> sigma = vis_file.read<t_real>("sigma");
188  assert(retemp.size() == imtemp.size());
189 
190  uv_vis.vis = Vector<t_complex>::Zero(retemp.size());
191  uv_vis.weights = Vector<t_complex>::Zero(retemp.size());
192  for (size_t i = 0; i < retemp.size(); ++i) {
193  uv_vis.vis(i) = t_complex(retemp[i], imtemp[i]);
194  uv_vis.weights(i) = 1 / sigma[i];
195  }
196 
197  uv_vis.ra = 0;
198  uv_vis.dec = 0;
199  uv_vis.average_frequency = 0;
200 
201  return uv_vis;
202 }
203 
206 utilities::vis_params stochread_visibility(H5Handler& file, const size_t N, const bool w_term) {
207  utilities::vis_params uv_vis;
208 
209  std::vector<t_real> utemp =
210  file.stochread<t_real>("u", N, true); //< shuffle batch starting position
211  uv_vis.u = Eigen::Map<Vector<t_real>>(utemp.data(), utemp.size(), 1);
212 
213  // found that a reflection is needed for the orientation
214  // of the gridded image to be correct
215  std::vector<t_real> vtemp = file.stochread<t_real>("v", N);
216  uv_vis.v = -Eigen::Map<Vector<t_real>>(vtemp.data(), vtemp.size(), 1);
217 
218  if (w_term) {
219  std::vector<t_real> wtemp = file.stochread<t_real>("w", N);
220  uv_vis.w = Eigen::Map<Vector<t_real>>(wtemp.data(), wtemp.size(), 1);
221  } else {
222  uv_vis.w = Vector<t_real>::Zero(utemp.size());
223  }
224 
225  std::vector<t_real> retemp = file.stochread<t_real>("re", N);
226  std::vector<t_real> imtemp = file.stochread<t_real>("im", N);
227  std::vector<t_real> sigma = file.stochread<t_real>("sigma", N);
228 
229  uv_vis.vis = Vector<t_complex>::Zero(retemp.size());
230  uv_vis.weights = Vector<t_complex>::Zero(retemp.size());
231  for (size_t i = 0; i < retemp.size(); ++i) {
232  uv_vis.vis(i) = t_complex(retemp[i], imtemp[i]);
233  uv_vis.weights(i) = 1 / sigma[i];
234  }
235 
236  uv_vis.ra = 0;
237  uv_vis.dec = 0;
238  uv_vis.average_frequency = 0;
239 
240  return uv_vis;
241 }
242 
244 void write_visibility(const utilities::vis_params& uv_vis, const std::string& h5name,
245  const bool w_term, const size_t chunksize = 0) {
246  // Set up HDF5 file
247  HighFive::File h5file(h5name, HighFive::File::OpenOrCreate | HighFive::File::Truncate);
248  // Set up file properties, such as chunking and compression
249  // Note: the I/O is minimised if compression is disabled (obvs)
250  // If using decompressed data is not an option, then the
251  // I/O performance can be optimised by chunking the dataset
252  // in such a way that each MPI rank only has to decompress
253  // its allocated segment (or a subset thereof)
254  HighFive::DataSetCreateProps props;
255  if (uv_vis.u.size()) {
256  if (chunksize > 0) {
257  props.add(HighFive::Chunking(std::vector<hsize_t>{chunksize}));
258  } else {
259  props.add(HighFive::Chunking(std::vector<hsize_t>{static_cast<hsize_t>(uv_vis.u.size())}));
260  }
261  props.add(HighFive::Deflate(9)); // maximal compression
262  }
263  // Create the H5 datasets
264  h5file.createDataSet("u", std::vector<t_real>(uv_vis.u.data(), uv_vis.u.data() + uv_vis.u.size()),
265  props);
266  h5file.createDataSet("v", std::vector<t_real>(uv_vis.v.data(), uv_vis.v.data() + uv_vis.v.size()),
267  props);
268  if (w_term) {
269  h5file.createDataSet(
270  "w", std::vector<t_real>(uv_vis.w.data(), uv_vis.w.data() + uv_vis.w.size()), props);
271  }
272 
273  std::vector<t_real> redata, imdata, sigma;
274  redata.reserve(uv_vis.vis.size());
275  imdata.reserve(uv_vis.vis.size());
276  sigma.reserve(uv_vis.weights.size());
277  for (size_t i = 0; i < uv_vis.vis.size(); ++i) {
278  redata.push_back(uv_vis.vis(i).real());
279  imdata.push_back(uv_vis.vis(i).imag());
280  sigma.push_back(1.0 / uv_vis.weights(i).real());
281  }
282  h5file.createDataSet("re", std::move(redata), props);
283  h5file.createDataSet("im", std::move(imdata), props);
284  h5file.createDataSet("sigma", std::move(imdata), props);
285 }
286 
287 } // namespace purify::H5
288 
289 #endif
Purify interface class to handle HDF5 input files.
Definition: h5reader.h:48
std::vector< T > read(const std::string &label) const
Method to read the entire dataset.
Definition: h5reader.h:76
std::vector< T > distread(const std::string &label)
Definition: h5reader.h:84
std::vector< T > stochread(const std::string &label, size_t batchsize, bool shuffle=false)
Definition: h5reader.h:96
H5Handler(const std::string &filename)
Default constructor (serial behaviour)
Definition: h5reader.h:55
utilities::vis_params read_visibility(const std::string &vis_name, const bool w_term)
Reads an HDF5 file with u,v visibilities, constructs a vis_params object and returns it.
Definition: h5reader.h:166
utilities::vis_params stochread_visibility(H5Handler &file, const size_t N, const bool w_term)
Stochastically reads dataset slices from the supplied HDF5-file handler, constructs a vis_params obje...
Definition: h5reader.h:206
void write_visibility(const utilities::vis_params &uv_vis, const std::string &h5name, const bool w_term, const size_t chunksize=0)
Write an HDF5 file with u,v visibilities from a vis_params object.
Definition: h5reader.h:244
Vector< t_complex > vis
Definition: uvw_utilities.h:22
Vector< t_complex > weights
Definition: uvw_utilities.h:23