libpqxx
The C++ client library for PostgreSQL
stream_query_impl.hxx
Go to the documentation of this file.
1 /* Code for parts of pqxx::internal::stream_query.
2  *
3  * These definitions need to be in a separate file in order to iron out
4  * circular dependencies between headers.
5  */
6 #ifndef PQXX_INTERNAL_STREAM_QUERY_IMPL_HXX
7 #define PQXX_INTERNAL_STREAM_QUERY_IMPL_HXX
8 
9 namespace pqxx::internal
10 {
11 template<typename... TYPE>
13  transaction_base &tx, std::string_view query, conversion_context c) :
14  transaction_focus{tx, "stream_query"},
15  m_char_finder{get_finder(tx, c.loc)},
16  m_ctx{c}
17 {
18  auto const r{tx.exec(std::format("COPY ({}) TO STDOUT", query), m_ctx.loc)};
19  r.expect_columns(sizeof...(TYPE), m_ctx.loc);
20  r.expect_rows(0, m_ctx.loc);
21  register_me();
22 }
23 
24 
25 template<typename... TYPE>
28 {
29  auto const group{tx.conn().get_encoding_group(loc)};
30  return get_char_finder<'\t', '\\'>(group, loc);
31 }
32 
33 
34 // TODO: Replace with generator? Could be faster (local vars vs. members).
36 
44 template<typename... TYPE> class stream_query_iterator final
45 {
46  using stream_t = stream_query<TYPE...>;
47 
48 public:
49  using value_type = std::tuple<TYPE...>;
50  using difference_type = long;
51 
53  m_home(&home),
54  m_line{typename stream_query<TYPE...>::line_handle(
55  nullptr, pqxx::internal::pq::pqfreemem)},
56  m_created_loc{loc}
57  {
58  consume_line(loc);
59  }
60 
64 
67 
69 
73  {
74  assert(not done());
75  consume_line(m_created_loc);
76  return *this;
77  }
78 
81  {
82  return m_home->parse_line(std::string_view{m_line.get(), m_line_size});
83  }
84 
86  bool operator==(stream_query_end_iterator) const noexcept { return done(); }
87 
90  {
91  return not done();
92  }
93 
94  friend bool
96  {
97  return i.done();
98  }
99 
100  friend bool
102  {
103  return not i.done();
104  }
105 
106 private:
108  [[nodiscard]] bool done() const noexcept { return m_home->done(); }
109 
111 
114  void consume_line(sl loc) &
115  {
116  auto [line, size]{m_home->read_line(loc)};
117  m_line = std::move(line);
118  m_line_size = size;
119  if (m_line)
120  {
121  // We know how many fields to expect. Replace the newline at the end
122  // with the field separator, so the parsing loop only needs to scan for a
123  // tab, not a tab or a newline.
124  char *const ptr{m_line.get()};
125  assert(ptr[size] == '\n');
126  ptr[size] = '\t';
127  }
128  }
129 
130  stream_t *const m_home = nullptr;
131 
133  typename stream_t::line_handle m_line;
134 
136  std::size_t m_line_size = 0u;
137 
139  sl const m_created_loc;
140 };
141 
142 
143 template<typename... TYPE> inline auto stream_query<TYPE...>::begin() &
144 {
145  return stream_query_iterator<TYPE...>{*this, m_ctx.loc};
146 }
147 
148 
149 template<typename... TYPE>
150 inline std::pair<typename stream_query<TYPE...>::line_handle, std::size_t>
152 {
153  assert(not done());
154 
155  internal::gate::connection_stream_from gate{trans().conn()};
156  try
157  {
158  auto line{gate.read_copy_line(loc)};
159  if (not line.first) [[unlikely]]
160  {
161  // This is how we get told the iteration is finished.
162  close();
163  }
164  return line;
165  }
166  catch (std::exception const &)
167  {
168  close();
169  throw;
170  }
171 }
172 } // namespace pqxx::internal
173 #endif
encoding_group get_encoding_group(sl loc=sl::current()) const
Read the curent client encoding's pqxx::encoding_group.
Definition: connection.hxx:536
Definition: connection-stream_from.hxx:17
Minimal iterator for stream_query.
Definition: stream_query_impl.hxx:45
stream_query_iterator & operator=(stream_query_iterator const &)=delete
stream_query_iterator(stream_t &home, sl loc)
Definition: stream_query_impl.hxx:52
bool operator==(stream_query_end_iterator) const noexcept
Are we at the end?
Definition: stream_query_impl.hxx:86
stream_query_iterator(stream_query_iterator const &)=delete
friend bool operator==(stream_query_end_iterator, stream_query_iterator const &i)
Definition: stream_query_impl.hxx:95
long difference_type
Definition: stream_query_impl.hxx:50
friend bool operator!=(stream_query_end_iterator, stream_query_iterator const &i)
Definition: stream_query_impl.hxx:101
value_type operator*() const
Dereference. There's no caching in here, so don't repeat calls.
Definition: stream_query_impl.hxx:80
stream_query_iterator & operator++() &
Pre-increment.
Definition: stream_query_impl.hxx:72
bool operator!=(stream_query_end_iterator) const noexcept
Do we have more iterations to go?
Definition: stream_query_impl.hxx:89
stream_query_iterator(stream_query_iterator &&)=delete
stream_query_iterator & operator=(stream_query_iterator &&)=delete
std::tuple< TYPE... > value_type
Definition: stream_query_impl.hxx:49
Stream query results from the database. Used by transaction_base::stream.
Definition: stream_query.hxx:68
stream_query(transaction_base &tx, std::string_view query, conversion_context c)
Execute query on tx, stream results.
Definition: stream_query_impl.hxx:12
bool done() const &noexcept
Has this stream reached the end of its data?
Definition: stream_query.hxx:94
std::pair< line_handle, std::size_t > read_line(sl) &
Read a COPY line from the server.
Definition: stream_query_impl.hxx:151
std::tuple< TYPE... > parse_line(std::string_view line) &
Parse and convert the latest line of data we received.
Definition: stream_query.hxx:110
std::unique_ptr< char[], void(*)(void const *)> line_handle
Definition: stream_query.hxx:70
auto begin() &
Begin iterator. Only for use by "range for.".
Definition: stream_query_impl.hxx:143
result const & expect_rows(size_type n, sl loc=sl::current()) const
Check that result contains exactly n rows.
Definition: result.hxx:413
result const & expect_columns(row_size_type cols, sl loc=sl::current()) const
Expect that result consists of exactly cols columns.
Definition: result.hxx:476
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:29
void register_me()
Definition: transaction_base.cxx:560
constexpr connection & conn() const noexcept
The connection in which this transaction lives.
Definition: transaction_base.hxx:1118
result exec(std::string_view query, std::string_view desc, sl=sl::current())
Execute a command.
Definition: transaction_base.cxx:286
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:151
#define PQXX_RETURNS_NONNULL
Definition: header-pre.hxx:119
void pqfreemem(void const *ptr) noexcept
Wrapper for PQfreemem(), with C++ linkage.
Definition: util.cxx:299
Private namespace for libpqxx's internal use; do not access.
Definition: connection.cxx:333
std::size_t(std::string_view haystack, std::size_t start, sl) char_finder_func
Function type: "find first occurrence of any of these ASCII characters.".
Definition: encoding_group.hxx:110
The end() iterator for a stream_query.
Definition: stream_query.hxx:35
The home of all libpqxx classes, functions, templates, etc.
Definition: array.cxx:26
std::source_location sl
Convenience alias for std::source_location. It's just too long.
Definition: types.hxx:38
format
Format code: is data text or binary?
Definition: types.hxx:121
Contextual parameters for string conversions implementations.
Definition: strconv.hxx:163
sl loc
A std::source_location for the call.
Definition: strconv.hxx:183