libpqxx
The C++ client library for PostgreSQL
stream_query.hxx
Go to the documentation of this file.
1 /* Definition of the pqxx::internal::stream_query class.
2  *
3  * Enables optimized batch reads from a database table.
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/transaction_base instead.
6  *
7  * Copyright (c) 2000-2026, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_INTERNAL_STREAM_QUERY_HXX
14 #define PQXX_INTERNAL_STREAM_QUERY_HXX
15 
16 #if !defined(PQXX_HEADER_PRE)
17 # error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18 #endif
19 
23 
24 
25 namespace pqxx
26 {
27 class transaction_base;
28 } // namespace pqxx
29 
30 
31 namespace pqxx::internal
32 {
35 {};
36 
37 
38 // TODO: Can we use generators, and maybe get speedup from HALO?
40 
67 template<typename... TYPE> class stream_query final : transaction_focus
68 {
69 public:
70  using line_handle = std::unique_ptr<char[], void (*)(void const *)>;
71 
73  inline stream_query(
74  transaction_base &tx, std::string_view query, conversion_context c);
75 
76  stream_query(stream_query const &) = delete;
77  stream_query(stream_query &&) = delete;
78  stream_query &operator=(stream_query const &) = delete;
80 
81  ~stream_query() noexcept
82  {
83  try
84  {
85  close();
86  }
87  catch (std::exception const &e)
88  {
89  reg_pending_error(e.what(), sl::current());
90  }
91  }
92 
94  [[nodiscard]] bool done() const & noexcept
95  {
96  return m_char_finder == nullptr;
97  }
98 
100  [[nodiscard]] inline auto begin() &;
101 
103 
107  [[nodiscard]] auto end() const & { return stream_query_end_iterator{}; }
108 
110  std::tuple<TYPE...> parse_line(std::string_view line) &
111  {
112  assert(not done());
113 
114  auto const line_size{std::size(line)};
115 
116  // This function uses m_row as a buffer, across calls. The only reason for
117  // it to carry over across calls is to avoid reallocation.
118 
119  // Make room for unescaping the line. It's a pessimistic size.
120  // Unusually, we're storing terminating zeroes *inside* the string.
121  // This is the only place where we modify m_row. MAKE SURE THE BUFFER DOES
122  // NOT GET RESIZED while we're working, because we're working with views
123  // into its buffer.
124  m_row.resize(line_size + 1);
125 
126  std::size_t offset{0u};
127  char *write{m_row.data()};
128 
129  // DO NOT shrink m_row to fit. We're carrying views pointing into the
130  // buffer. (Also, how useful would shrinking really be?)
131 
132  // Folding expression: scan and unescape each field, and convert it to its
133  // requested type.
134  std::tuple<TYPE...> data{parse_field<TYPE>(line, offset, write, m_ctx)...};
135 
136  assert(offset == line_size + 1u);
137  return data;
138  }
139 
141  std::pair<line_handle, std::size_t> read_line(sl) &;
142 
143 private:
145 
149  get_finder(transaction_base const &tx, sl);
150 
152 
166  std::tuple<std::size_t, char *, std::string_view>
167  read_field(std::string_view line, std::size_t offset, char *write, ctx c)
168  {
169 #if !defined(NDEBUG)
170  auto const line_size{std::size(line)};
171 #endif
172 
173  assert(offset <= line_size);
174 
175  char const *lp{std::data(line)};
176 
177  // The COPY line now ends in a tab. (We replace the trailing newline with
178  // that to simplify the loop here.)
179  assert(lp[line_size] == '\t');
180  assert(lp[line_size + 1] == '\0');
181 
182  if ((lp[offset] == '\\') and (lp[offset + 1] == 'N'))
183  {
184  // Null field. Consume the "\N" and the field separator.
185  offset += 3;
186  assert(offset <= (line_size + 1));
187  assert(lp[offset - 1] == '\t');
188  // Return a null value. There's nothing to write into m_row.
189  return {offset, write, {}};
190  }
191 
192  // Beginning of the field text in the row buffer.
193  char const *const field_begin{write};
194 
195  // We're relying on several assumptions just for making the main loop
196  // condition work:
197  // * The COPY line ends in a newline.
198  // * Multibyte characters never start with an ASCII-range byte.
199  // * We can index a view beyond its bounds (but within its address space).
200  //
201  // Effectively, the newline acts as a final field separator.
202  while (lp[offset] != '\t')
203  {
204  assert(lp[offset] != '\0');
205 
206  // Beginning of the next character of interest (or the end of the line).
207  // It may be right where we start searching, and this won't loop forever
208  // since the previous iteration (if any) put us right _after_ the
209  // previous character of interest.
210  auto const stop_char{m_char_finder(line, offset, c.loc)};
211  PQXX_ASSUME(stop_char >= offset);
212  assert(stop_char < (line_size + 1));
213 
214  // Copy the text we have so far. It's got no special characters in it.
215  std::memcpy(write, &lp[offset], stop_char - offset);
216  write += (stop_char - offset);
217  offset = stop_char;
218 
219  // We're still within the line.
220  char const special{lp[offset]};
221  if (special == '\\')
222  {
223  // Escape sequence.
224  // Consume the backslash.
225  ++offset;
226  assert(offset < line_size);
227 
228  // The database will only escape ASCII characters, so we assume that
229  // we're dealing with a single-byte character.
230  char const escaped{lp[offset]};
231 
232  // I think this is a valid way to check for the high bit: the shift
233  // may be signed or unsigned (implementation-defined for char), but
234  // either way we get a zero if the bit is clear or nonzero if it's set.
235  assert((escaped >> 7) == 0);
236  ++offset;
237  *write++ = unescape_char(escaped);
238  }
239  else
240  {
241  // Field separator. Fall out of the loop.
242  assert(special == '\t');
243  }
244  }
245 
246  // Hit the end of the field.
247  assert(lp[offset] == '\t');
248  *write = '\0';
249  ++write;
250  ++offset;
251  return {
252  offset,
253  write,
254  {field_begin, static_cast<std::size_t>(write - field_begin - 1)}};
255  }
256 
258 
271  template<typename TARGET>
272  TARGET
273  parse_field(std::string_view line, std::size_t &offset, char *&write, ctx c)
274  {
275  using field_type = std::remove_cvref_t<TARGET>;
276 
277  assert(offset <= std::size(line));
278 
279  auto [new_offset, new_write, text]{read_field(line, offset, write, c)};
280  PQXX_ASSUME(new_offset > offset);
281  PQXX_ASSUME(new_write >= write);
282  offset = new_offset;
283  write = new_write;
284  if constexpr (pqxx::always_null<TARGET>())
285  {
286  if (std::data(text) != nullptr)
287  throw conversion_error{std::format(
288  "Streaming a non-null value into a {}, which must always be null.",
289  name_type<field_type>())};
290  }
291  else if (std::data(text) == nullptr)
292  {
293  if constexpr (has_null<TARGET>())
294  return make_null<TARGET>();
295  else
296  internal::throw_null_conversion(name_type<field_type>(), c.loc);
297  }
298  else [[likely]]
299  {
300  // Don't ever try to convert a non-null value to nullptr_t!
301  return from_string<field_type>(text, c);
302  }
303  }
304 
306  void close() noexcept
307  {
308  if (not done())
309  {
310  m_char_finder = nullptr;
311  unregister_me();
312  }
313  }
314 
316 
320  char_finder_func *m_char_finder;
321 
323 
327  std::string m_row;
328 
330  conversion_context const m_ctx;
331 };
332 } // namespace pqxx::internal
333 #endif
Stream query results from the database. Used by transaction_base::stream.
Definition: stream_query.hxx:68
stream_query(transaction_base &tx, std::string_view query, conversion_context c)
Execute query on tx, stream results.
Definition: stream_query_impl.hxx:12
bool done() const &noexcept
Has this stream reached the end of its data?
Definition: stream_query.hxx:94
stream_query & operator=(stream_query &&)=delete
stream_query(stream_query const &)=delete
std::pair< line_handle, std::size_t > read_line(sl) &
Read a COPY line from the server.
Definition: stream_query_impl.hxx:151
std::tuple< TYPE... > parse_line(std::string_view line) &
Parse and convert the latest line of data we received.
Definition: stream_query.hxx:110
~stream_query() noexcept
Definition: stream_query.hxx:81
stream_query & operator=(stream_query const &)=delete
stream_query(stream_query &&)=delete
std::unique_ptr< char[], void(*)(void const *)> line_handle
Definition: stream_query.hxx:70
auto begin() &
Begin iterator. Only for use by "range for.".
Definition: stream_query_impl.hxx:143
auto end() const &
End iterator. Only for use by "range for.".
Definition: stream_query.hxx:107
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:29
void unregister_me() noexcept
Definition: transaction_base.cxx:568
void reg_pending_error(std::string const &, sl) noexcept
Definition: transaction_base.cxx:576
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:151
#define PQXX_PURE
Definition: header-pre.hxx:64
#define PQXX_ASSUME(condition)
Definition: header-pre.hxx:228
#define PQXX_RETURNS_NONNULL
Definition: header-pre.hxx:119
Private namespace for libpqxx's internal use; do not access.
Definition: connection.cxx:333
void throw_null_conversion(std::string const &type, sl loc)
Throw exception for attempt to convert SQL NULL to given type.
Definition: strconv.cxx:142
constexpr PQXX_PURE char unescape_char(char escaped) noexcept
Return original byte for escaped character.
Definition: util.hxx:617
std::size_t(std::string_view haystack, std::size_t start, sl) char_finder_func
Function type: "find first occurrence of any of these ASCII characters.".
Definition: encoding_group.hxx:110
The end() iterator for a stream_query.
Definition: stream_query.hxx:35
The home of all libpqxx classes, functions, templates, etc.
Definition: array.cxx:26
constexpr TYPE make_null() requires(pqxx
Return a null value of TYPE.
Definition: strconv.hxx:781
std::source_location sl
Convenience alias for std::source_location. It's just too long.
Definition: types.hxx:38
constexpr std::string_view name_type() noexcept
Return human-readable name for TYPE.
Definition: types.hxx:277
conversion_context const & ctx
Convenience alias: const reference to a pqxx::conversion_context.
Definition: strconv.hxx:201
format
Format code: is data text or binary?
Definition: types.hxx:121
Contextual parameters for string conversions implementations.
Definition: strconv.hxx:163