PURIFY
Next-generation radio interferometric imaging
distribute.cc
Go to the documentation of this file.
1 #include "purify/distribute.h"
3 
4 namespace purify::distribute {
5 
6 std::vector<t_int> distribute_measurements(Vector<t_real> const &u, Vector<t_real> const &v,
7  Vector<t_real> const &w, t_int const number_of_nodes,
8  distribute::plan const distribution_plan,
9  t_int const &grid_size) {
10  // distrubte visibilities from a measurement
11  Vector<t_int> index = Vector<t_int>::LinSpaced(u.size(), 0, u.size());
12  t_int const partition_size =
13  std::ceil(static_cast<t_real>(u.size()) / static_cast<t_real>(number_of_nodes));
14  // return a vector of vectors of indicies for each node
15  std::string plan_name = "";
16  switch (distribution_plan) {
17  case plan::none: {
18  plan_name = "none";
19  break;
20  }
21  case plan::equal: {
22  index = equal_distribution(u, v, grid_size);
23  plan_name = "equal";
24  break;
25  }
26  case plan::radial: {
27  index = distance_distribution(u, v);
28  plan_name = "radial";
29  break;
30  }
31  case plan::w_term: {
32  index = w_distribution(w);
33  plan_name = "w_term";
34  break;
35  }
36  default: {
37  throw std::runtime_error("Distribution plan not recognised or implimented.");
38  break;
39  }
40  }
42  "Using {} to make {} partitions from {} visibilities, with {} visibilities per a node.",
43  plan_name, number_of_nodes, index.size(), partition_size);
44  std::vector<t_int> partitions(u.size());
45  if (std::floor(static_cast<t_real>(index.size() - 1) / static_cast<t_real>(partition_size)) >
46  number_of_nodes - 1) {
47  PURIFY_ERROR("Error: Probably a bug in distribution plan.");
48  throw std::runtime_error("Distributing data into too many nodes");
49  }
50  // creating partitions
51  for (t_int i = 0; i < index.size(); ++i) {
52  partitions[index(i)] = std::floor(static_cast<t_real>(i) / static_cast<t_real>(partition_size));
53  }
54  return partitions;
55 }
56 
57 Vector<t_int> w_distribution(const Vector<t_real> &u, const Vector<t_real> &v,
58  Vector<t_real> const &w) {
59  return w_distribution(w);
60 }
61 
62 Vector<t_int> w_distribution(Vector<t_real> const &w) {
63  // sort visibilities by w from w_max to w_min
64  Vector<t_int> index = Vector<t_int>::LinSpaced(w.size(), 0, w.size());
65  std::sort(index.data(), index.data() + index.size(), [&w](int a, int b) { return w(a) < w(b); });
66  return index;
67 }
68 
69 Vector<t_int> distance_distribution(Vector<t_real> const &u, Vector<t_real> const &v) {
70  // sort visibilities by distance from centre
71  Vector<t_int> index = Vector<t_int>::LinSpaced(u.size(), 0, u.size());
72  std::sort(index.data(), index.data() + index.size(), [&u, &v](int a, int b) {
73  return std::sqrt(u(a) * u(a) + v(a) * v(a)) < std::sqrt(u(b) * u(b) + v(b) * v(b));
74  });
75  return index;
76 }
77 
78 Vector<t_int> equal_distribution(Vector<t_real> const &u, Vector<t_real> const &v,
79  t_int const &grid_size) {
80  // distribute visibilities by density calculated from histogram
81  t_real const max_u = u.array().maxCoeff();
82  t_real const max_v = v.array().maxCoeff();
83  t_real const min_u = u.array().minCoeff();
84  t_real const min_v = v.array().minCoeff();
85  // scaling for histogram grid
86  Vector<t_real> const scaled_u = (u.array() - min_u) * (grid_size - 1) / (max_u - min_u);
87  Vector<t_real> const scaled_v = (v.array() - min_v) * (grid_size - 1) / (max_v - min_v);
88  Image<t_int> histogram = Image<t_int>::Zero(grid_size, grid_size);
89  Vector<t_int> index = Vector<t_int>::LinSpaced(u.size(), 0, u.size());
90  // creating histogram grid
91  for (t_int i = 0; i < index.size(); i++) {
92  histogram((t_int)std::floor(scaled_u(i)), (t_int)std::floor(scaled_v(i))) += 1;
93  }
94  // sorting indicies
95  std::sort(index.data(), index.data() + index.size(),
96  [&histogram, &scaled_u, &scaled_v](int a, int b) {
97  return (histogram((t_int)std::floor(scaled_u(a)), (t_int)std::floor(scaled_v(a))) <
98  histogram((t_int)std::floor(scaled_u(b)), (t_int)std::floor(scaled_v(b))));
99  });
100  return index;
101 }
102 
103 std::tuple<std::vector<t_int>, std::vector<t_real>> kmeans_algo(
104  const Vector<t_real> &w, const t_int number_of_nodes, const t_int iters,
105  const std::function<t_real(t_real)> &cost, const t_real rel_diff) {
106  std::vector<t_int> w_node(w.size(), 0);
107  std::vector<t_real> w_centre(number_of_nodes, 0);
108  std::vector<t_real> w_sum(number_of_nodes, 0);
109  std::vector<t_real> w_count(number_of_nodes, 0);
110  t_real diff = 1;
111  t_real const wmin = w.minCoeff();
112  t_real const wmax = w.maxCoeff();
113  PURIFY_DEBUG("Min w = {}", wmin);
114  PURIFY_DEBUG("Max w = {}", wmax);
115  for (int i = 0; i < w_centre.size(); i++)
116  w_centre[i] = (i * (wmax - wmin) / number_of_nodes + wmin);
117  // lopp through even nodes to reduces w-term
118  for (int n = 0; n < iters; n++) {
119  PURIFY_DEBUG("clustering iteration {}", n);
120  for (int i = 0; i < w.size(); i++) {
121  t_real min = 2 * cost(wmax - wmin);
122  for (int node = 0; node < number_of_nodes; node++) {
123  const t_real cost_val = cost(w(i) - w_centre.at(node));
124  if (cost_val < min) {
125  min = cost_val;
126  w_node[i] = node;
127  }
128  }
129  w_sum[w_node.at(i)] += w(i);
130  w_count[w_node.at(i)]++;
131  }
132  for (int j = 0; j < number_of_nodes; j++) {
133  diff += std::abs(((w_count.at(j) > 0) ? w_sum.at(j) / w_count.at(j) : 0) - w_centre.at(j)) /
134  std::abs(w_centre.at(j) * number_of_nodes);
135  w_centre[j] = (w_count.at(j) > 0) ? w_sum.at(j) / w_count.at(j) : 0;
136  PURIFY_DEBUG("Node {} has {} visibilities, using w-stack w = {}.", j, w_count.at(j),
137  w_centre.at(j));
138  w_sum[j] = 0;
139  w_count[j] = 0;
140  }
141  if (diff < rel_diff) {
142  PURIFY_DEBUG("Converged!");
143  break;
144  } else {
145  PURIFY_DEBUG("relative_diff = {}", diff);
146  diff = 0;
147  }
148  }
149 
150  return std::make_tuple(w_node, w_centre);
151 }
152 
153 #ifdef PURIFY_MPI
154 std::tuple<std::vector<t_int>, std::vector<t_real>> kmeans_algo(
155  const Vector<t_real> &w, const t_int number_of_nodes, const t_int iters,
156  sopt::mpi::Communicator const &comm, const std::function<t_real(t_real)> &cost,
157  const t_real rel_diff) {
158  std::vector<t_int> w_node(w.size(), 0);
159  std::vector<t_real> w_centre(number_of_nodes, 0);
160  std::vector<t_real> w_sum(number_of_nodes, 0);
161  std::vector<t_real> w_count(number_of_nodes, 0);
162  t_real diff = 1;
163  t_real const wmin = comm.all_reduce<t_real>(w.minCoeff(), MPI_MIN);
164  t_real const wmax = comm.all_reduce<t_real>(w.maxCoeff(), MPI_MAX);
165  if (comm.is_root()) PURIFY_DEBUG("Min w = {}", wmin);
166  if (comm.is_root()) PURIFY_DEBUG("Max w = {}", wmax);
167  for (int i = 0; i < w_centre.size(); i++)
168  w_centre[i] =
169  (static_cast<t_real>(i) * (wmax - wmin) / static_cast<t_real>(number_of_nodes) + wmin);
170  // loop through even nodes to reduce w-term
171  for (int n = 0; n < iters; n++) {
172  if (comm.is_root()) PURIFY_DEBUG("clustering iteration {}", n);
173  for (int i = 0; i < w.size(); i++) {
174  t_real min = 2 * cost(wmax - wmin);
175  for (int node = 0; node < number_of_nodes; node++) {
176  const t_real cost_val = cost(w(i) - w_centre.at(node));
177  if (cost_val < min) {
178  min = cost_val;
179  w_node[i] = node;
180  }
181  }
182  w_sum[w_node.at(i)] += w(i);
183  w_count[w_node.at(i)]++;
184  }
185  for (int j = 0; j < number_of_nodes; j++) {
186  const t_real global_w_sum = comm.all_sum_all<t_real>(w_sum.at(j));
187  const t_real global_w_count = comm.all_sum_all<t_real>(w_count.at(j));
188  diff +=
189  std::abs(((global_w_count > 0) ? global_w_sum / global_w_count : 0) - w_centre.at(j)) /
190  std::abs(w_centre.at(j) * number_of_nodes);
191  w_centre[j] = (global_w_count > 0) ? global_w_sum / global_w_count : 0;
192  if (comm.is_root())
193  PURIFY_DEBUG("Node {} has {} visibilities, using w-stack w = {}.", j, global_w_count,
194  w_centre.at(j));
195  w_sum[j] = 0;
196  w_count[j] = 0;
197  }
198  if (diff < rel_diff) {
199  if (comm.is_root()) PURIFY_DEBUG("Converged!");
200  break;
201  } else {
202  if (comm.is_root()) PURIFY_DEBUG("relative_diff = {}", diff);
203  diff = 0;
204  }
205  }
206 
207  return std::make_tuple(w_node, w_centre);
208 }
209 
210 std::vector<t_int> w_support(Vector<t_real> const &w, const std::vector<t_int> &image_index,
211  const std::vector<t_real> &w_stacks, const t_real du,
212  const t_int min_support, const t_int max_support,
213  const t_real fill_relaxation, sopt::mpi::Communicator const &comm) {
214  t_real coeff_total = 0;
215  for (t_int i = 0; i < w.size(); i++)
216  coeff_total += widefield::w_support(std::abs(w(i) - w_stacks.at(image_index.at(i))), du,
217  min_support, max_support) *
218  widefield::w_support(std::abs(w(i) - w_stacks.at(image_index.at(i))), du,
219  min_support, max_support);
220  const t_real coeff_average =
221  comm.all_sum_all<t_real>(coeff_total) / static_cast<t_real>(comm.size());
222  if (comm.is_root()) {
223  PURIFY_DEBUG("Each node should have on average {} coefficients.", coeff_average);
224  }
225  t_real coeff_sum = 0;
226  t_int group = 0;
227  std::vector<t_int> groups(w.size(), comm.rank());
228  std::vector<t_int> coeffs(comm.size(), comm.rank());
229  long long int total = 0;
230  for (t_int rank = 0; rank < comm.size(); rank++) {
231  const auto size = comm.broadcast(w.size(), rank);
232  for (t_int i = 0; i < size; i++) {
233  if (comm.rank() == rank) {
234  const t_int cost = widefield::w_support(std::abs(w(i) - w_stacks.at(image_index.at(i))), du,
235  min_support, max_support) *
236  widefield::w_support(std::abs(w(i) - w_stacks.at(image_index.at(i))), du,
237  min_support, max_support);
238  total += cost;
239  if (group < (comm.size() - 1))
240  if ((cost + coeff_sum) > coeff_average * (1. + fill_relaxation)) {
241  PURIFY_DEBUG("{} node should have {} coefficients.", group, coeff_sum);
242  coeff_sum = 0;
243  group++;
244  }
245  coeff_sum += cost;
246  groups[i] = group;
247  }
248  }
249  if (group > comm.size() - 1) {
250  throw std::runtime_error(
251  "Error distributing visibilites to even computational load for wide field imaging. Group "
252  "number out of bounds.");
253  }
254  coeff_sum = comm.broadcast(coeff_sum, rank);
255  group = comm.broadcast(group, rank);
256 
257  if (total != coeff_total and comm.rank() == rank) {
258  throw std::runtime_error(
259  "Total number of coefficients calculated is not the same, loop might be broken. " +
260  std::to_string(total) + " != " + std::to_string(coeff_total));
261  }
262  }
263  if (comm.is_root()) PURIFY_DEBUG("{} node should have {} coefficients.", group, coeff_sum);
264  return groups;
265 }
266 #endif
267 
268 } // namespace purify::distribute
#define PURIFY_ERROR(...)
\macro Something is definitely wrong, algorithm exits
Definition: logging.h:190
#define PURIFY_DEBUG(...)
\macro Output some debugging
Definition: logging.h:197
const std::vector< t_real > u
data for u coordinate
Definition: operators.cc:18
const std::vector< t_real > v
data for v coordinate
Definition: operators.cc:20
Vector< t_int > w_distribution(const Vector< t_real > &u, const Vector< t_real > &v, Vector< t_real > const &w)
Distribute visibilities into nodes in order of w terms (useful for w-stacking)
Definition: distribute.cc:57
std::tuple< std::vector< t_int >, std::vector< t_real > > kmeans_algo(const Vector< t_real > &w, const t_int number_of_nodes, const t_int iters, const std::function< t_real(t_real)> &cost, const t_real rel_diff)
patition w terms using k-means
Definition: distribute.cc:103
Vector< t_int > equal_distribution(Vector< t_real > const &u, Vector< t_real > const &v, t_int const &grid_size)
Distribute the visiblities into nodes in order of density.
Definition: distribute.cc:78
std::vector< t_int > distribute_measurements(Vector< t_real > const &u, Vector< t_real > const &v, Vector< t_real > const &w, t_int const number_of_nodes, distribute::plan const distribution_plan, t_int const &grid_size)
Distribute visiblities into groups.
Definition: distribute.cc:6
Vector< t_int > distance_distribution(Vector< t_real > const &u, Vector< t_real > const &v)
Distribute visiblities into nodes in order of distance from the centre.
Definition: distribute.cc:69
t_int w_support(const t_real w, const t_real du, const t_int min, const t_int max)
estimate support size of w given u resolution du