libpqxx
The C++ client library for PostgreSQL
stream_query.hxx
1 /* Definition of the pqxx::internal::stream_query class.
2  *
3  * Enables optimized batch reads from a database table.
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/stream_query instead.
6  *
7  * Copyright (c) 2000-2024, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_H_STREAM_QUERY
14 #define PQXX_H_STREAM_QUERY
15 
16 #if !defined(PQXX_HEADER_PRE)
17 # error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18 #endif
19 
20 #include <cassert>
21 #include <functional>
22 #include <variant>
23 
24 #include "pqxx/connection.hxx"
25 #include "pqxx/except.hxx"
26 #include "pqxx/internal/concat.hxx"
27 #include "pqxx/internal/encoding_group.hxx"
28 #include "pqxx/internal/encodings.hxx"
29 #include "pqxx/internal/gates/connection-stream_from.hxx"
30 #include "pqxx/internal/stream_iterator.hxx"
31 #include "pqxx/separated_list.hxx"
32 #include "pqxx/transaction_base.hxx"
33 #include "pqxx/transaction_focus.hxx"
34 #include "pqxx/util.hxx"
35 
36 
37 namespace pqxx
38 {
39 class transaction_base;
40 } // namespace pqxx
41 
42 
43 namespace pqxx::internal
44 {
47 {};
48 
49 
50 // C++20: Can we use generators, and maybe get speedup from HALO?
52 
79 template<typename... TYPE> class stream_query : transaction_focus
80 {
81 public:
82  using line_handle = std::unique_ptr<char, void (*)(void const *)>;
83 
85  inline stream_query(transaction_base &tx, std::string_view query);
86 
87  stream_query(stream_query &&) = delete;
88  stream_query &operator=(stream_query &&) = delete;
89 
90  ~stream_query() noexcept
91  {
92  try
93  {
94  close();
95  }
96  catch (std::exception const &e)
97  {
98  reg_pending_error(e.what());
99  }
100  }
101 
103  bool done() const & noexcept { return m_char_finder == nullptr; }
104 
106  inline auto begin() &;
108 
112  auto end() const & { return stream_query_end_iterator{}; }
113 
115  std::tuple<TYPE...> parse_line(zview line) &
116  {
117  assert(not done());
118 
119  auto const line_size{std::size(line)};
120 
121  // This function uses m_row as a buffer, across calls. The only reason for
122  // it to carry over across calls is to avoid reallocation.
123 
124  // Make room for unescaping the line. It's a pessimistic size.
125  // Unusually, we're storing terminating zeroes *inside* the string.
126  // This is the only place where we modify m_row. MAKE SURE THE BUFFER DOES
127  // NOT GET RESIZED while we're working, because we're working with views
128  // into its buffer.
129  m_row.resize(line_size + 1);
130 
131  std::size_t offset{0u};
132  char *write{m_row.data()};
133 
134  // DO NOT shrink m_row to fit. We're carrying views pointing into the
135  // buffer. (Also, how useful would shrinking really be?)
136 
137  // Folding expression: scan and unescape each field, and convert it to its
138  // requested type.
139  std::tuple<TYPE...> data{parse_field<TYPE>(line, offset, write)...};
140 
141  assert(offset == line_size + 1u);
142  return data;
143  }
144 
146  std::pair<line_handle, std::size_t> read_line() &;
147 
148 private:
150 
153  static inline char_finder_func *get_finder(transaction_base const &tx);
154 
156 
170  std::tuple<std::size_t, char *, zview>
171  read_field(zview line, std::size_t offset, char *write)
172  {
173 #if !defined(NDEBUG)
174  auto const line_size{std::size(line)};
175 #endif
176 
177  assert(offset <= line_size);
178 
179  char const *lp{std::data(line)};
180 
181  // The COPY line now ends in a tab. (We replace the trailing newline with
182  // that to simplify the loop here.)
183  assert(lp[line_size] == '\t');
184  assert(lp[line_size + 1] == '\0');
185 
186  if ((lp[offset] == '\\') and (lp[offset + 1] == 'N'))
187  {
188  // Null field. Consume the "\N" and the field separator.
189  offset += 3;
190  assert(offset <= (line_size + 1));
191  assert(lp[offset - 1] == '\t');
192  // Return a null value. There's nothing to write into m_row.
193  return {offset, write, {}};
194  }
195 
196  // Beginning of the field text in the row buffer.
197  char const *const field_begin{write};
198 
199  // We're relying on several assumptions just for making the main loop
200  // condition work:
201  // * The COPY line ends in a newline.
202  // * Multibyte characters never start with an ASCII-range byte.
203  // * We can index a view beyond its bounds (but within its address space).
204  //
205  // Effectively, the newline acts as a final field separator.
206  while (lp[offset] != '\t')
207  {
208  assert(lp[offset] != '\0');
209 
210  // Beginning of the next character of interest (or the end of the line).
211  auto const stop_char{m_char_finder(line, offset)};
212  PQXX_ASSUME(stop_char > offset);
213  assert(stop_char < (line_size + 1));
214 
215  // Copy the text we have so far. It's got no special characters in it.
216  std::memcpy(write, &lp[offset], stop_char - offset);
217  write += (stop_char - offset);
218  offset = stop_char;
219 
220  // We're still within the line.
221  char const special{lp[offset]};
222  if (special == '\\')
223  {
224  // Escape sequence.
225  // Consume the backslash.
226  ++offset;
227  assert(offset < line_size);
228 
229  // The database will only escape ASCII characters, so we assume that
230  // we're dealing with a single-byte character.
231  char const escaped{lp[offset]};
232  assert((escaped >> 7) == 0);
233  ++offset;
234  *write++ = unescape_char(escaped);
235  }
236  else
237  {
238  // Field separator. Fall out of the loop.
239  assert(special == '\t');
240  }
241  }
242 
243  // Hit the end of the field.
244  assert(lp[offset] == '\t');
245  *write = '\0';
246  ++write;
247  ++offset;
248  return {offset, write, {field_begin, write - field_begin - 1}};
249  }
250 
252 
265  template<typename TARGET>
266  TARGET parse_field(zview line, std::size_t &offset, char *&write)
267  {
268  using field_type = strip_t<TARGET>;
269  using nullity = nullness<field_type>;
270 
271  assert(offset <= std::size(line));
272 
273  auto [new_offset, new_write, text]{read_field(line, offset, write)};
274  PQXX_ASSUME(new_offset > offset);
275  PQXX_ASSUME(new_write >= write);
276  offset = new_offset;
277  write = new_write;
278  if constexpr (nullity::always_null)
279  {
280  if (std::data(text) != nullptr)
281  throw conversion_error{concat(
282  "Streaming a non-null value into a ", type_name<field_type>,
283  ", which must always be null.")};
284  }
285  else if (std::data(text) == nullptr)
286  {
287  if constexpr (nullity::has_null)
288  return nullity::null();
289  else
290  internal::throw_null_conversion(type_name<field_type>);
291  }
292  else
293  {
294  // Don't ever try to convert a non-null value to nullptr_t!
295  return from_string<field_type>(text);
296  }
297  }
298 
300  void close() noexcept
301  {
302  if (not done())
303  {
304  m_char_finder = nullptr;
305  unregister_me();
306  }
307  }
308 
310 
314  char_finder_func *m_char_finder;
315 
317 
321  std::string m_row;
322 };
323 } // namespace pqxx::internal
324 #endif
Stream query results from the database. Used by transaction_base::stream.
Definition: stream_query.hxx:80
std::tuple< TYPE... > parse_line(zview line) &
Parse and convert the latest line of data we received.
Definition: stream_query.hxx:115
bool done() const &noexcept
Has this stream reached the end of its data?
Definition: stream_query.hxx:103
stream_query(transaction_base &tx, std::string_view query)
Execute query on tx, stream results.
Definition: stream_query_impl.hxx:12
auto begin() &
Begin iterator. Only for use by "range for.".
Definition: stream_query_impl.hxx:153
std::pair< line_handle, std::size_t > read_line() &
Read a COPY line from the server.
Definition: stream_query_impl.hxx:161
auto end() const &
End iterator. Only for use by "range for.".
Definition: stream_query.hxx:112
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:29
Marker-type wrapper: zero-terminated std::string_view.
Definition: zview.hxx:38
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:150
Internal items for libpqxx' own use. Do not use these yourself.
Definition: encodings.cxx:33
std::string concat(TYPE... item)
Efficiently combine a bunch of items into one big string.
Definition: concat.hxx:31
void throw_null_conversion(std::string const &type)
Throw exception for attempt to convert SQL NULL to given type.
Definition: strconv.cxx:255
std::size_t(std::string_view haystack, std::size_t start) char_finder_func
Function type: "find first occurrence of specific any of ASCII characters.".
Definition: encoding_group.hxx:71
constexpr char unescape_char(char escaped) noexcept
Return original byte for escaped character.
Definition: util.hxx:633
The end() iterator for a stream_query.
Definition: stream_query.hxx:47
The home of all libpqxx classes, functions, templates, etc.
Definition: array.cxx:27