libpqxx
The C++ client library for PostgreSQL
stream_query_impl.hxx
1 /* Code for parts of pqxx::internal::stream_query.
2  *
3  * These definitions needs to be in a separate file in order to iron out
4  * circular dependencies between headers.
5  */
6 #if !defined(PQXX_H_STREAM_QUERY_IMPL)
7 # define PQXX_H_STREAM_QUERY_IMPL
8 
9 namespace pqxx::internal
10 {
11 template<typename... TYPE>
13  transaction_base &tx, std::string_view query) :
14  transaction_focus{tx, "stream_query"}, m_char_finder{get_finder(tx)}
15 {
16  auto const r{tx.exec0(internal::concat("COPY (", query, ") TO STDOUT"))};
17  if (r.columns() != sizeof...(TYPE))
18  throw usage_error{concat(
19  "Parsing query stream with wrong number of columns: "
20  "code expects ",
21  sizeof...(TYPE), " but query returns ", r.columns(), ".")};
22  register_me();
23 }
24 
25 
26 template<typename... TYPE>
27 inline char_finder_func *
29 {
30  auto const group{enc_group(tx.conn().encoding_id())};
31  return get_s_char_finder<'\t', '\\'>(group);
32 }
33 
34 
35 // C++20: Replace with generator? Could be faster (local vars vs. members).
37 
40 template<typename... TYPE> class stream_query_input_iterator
41 {
42  using stream_t = stream_query<TYPE...>;
43 
44 public:
45  using value_type = std::tuple<TYPE...>;
46  using difference_type = long;
47 
48  explicit stream_query_input_iterator(stream_t &home) :
49  m_home(&home),
50  m_line{typename stream_query<TYPE...>::line_handle(
52  {
53  consume_line();
54  }
57 
60  {
61  assert(not done());
62  consume_line();
63  return *this;
64  }
65 
67 
70  {
71  ++*this;
72  return {};
73  }
74 
76  value_type operator*() const
77  {
78  return m_home->parse_line(zview{m_line.get(), m_line_size});
79  }
80 
82  bool operator==(stream_query_end_iterator) const noexcept { return done(); }
85  {
86  return not done();
87  }
88 
90  operator=(stream_query_input_iterator &&rhs) noexcept
91  {
92  if (&rhs != this)
93  {
94  m_line = std::move(rhs.m_line);
95  m_home = rhs.m_home;
96  m_line_size = rhs.m_line_size;
97  }
98  return *this;
99  }
100 
101 private:
102  stream_query_input_iterator() {}
103 
105  bool done() const noexcept { return m_home->done(); }
106 
108 
111  void consume_line() &
112  {
113  auto [line, size]{m_home->read_line()};
114  m_line = std::move(line);
115  m_line_size = size;
116  if (m_line)
117  {
118  // We know how many fields to expect. Replace the newline at the end
119  // with the field separator, so the parsing loop only needs to scan for a
120  // tab, not a tab or a newline.
121  char *const ptr{m_line.get()};
122  assert(ptr[size] == '\n');
123  ptr[size] = '\t';
124  }
125  }
126 
127  stream_t *m_home;
128 
130  typename stream_t::line_handle m_line;
131 
133  std::size_t m_line_size;
134 };
135 
136 
137 template<typename... TYPE>
138 inline bool operator==(
139  stream_query_end_iterator, stream_query_input_iterator<TYPE...> const &i)
140 {
141  return i.done();
142 }
143 
144 
145 template<typename... TYPE>
146 inline bool operator!=(
147  stream_query_end_iterator, stream_query_input_iterator<TYPE...> const &i)
148 {
149  return not i.done();
150 }
151 
152 
153 template<typename... TYPE> inline auto stream_query<TYPE...>::begin() &
154 {
155  return stream_query_input_iterator<TYPE...>{*this};
156 }
157 
158 
159 template<typename... TYPE>
160 inline std::pair<typename stream_query<TYPE...>::line_handle, std::size_t>
162 {
163  assert(not done());
164 
165  internal::gate::connection_stream_from gate{m_trans->conn()};
166  try
167  {
168  auto line{gate.read_copy_line()};
169  // Check for completion.
170  if (not line.first)
171  PQXX_UNLIKELY close();
172  return line;
173  }
174  catch (std::exception const &)
175  {
176  close();
177  throw;
178  }
179 }
180 } // namespace pqxx::internal
181 #endif
int encoding_id() const
Get the connection's encoding, as a PostgreSQL-defined code.
Definition: connection.cxx:1112
Input iterator for stream_query.
Definition: stream_query_impl.hxx:41
bool operator!=(stream_query_end_iterator) const noexcept
Comparison only works for comparing to end().
Definition: stream_query_impl.hxx:84
bool operator==(stream_query_end_iterator) const noexcept
Are we at the end?
Definition: stream_query_impl.hxx:82
value_type operator*() const
Dereference. There's no caching in here, so don't repeat calls.
Definition: stream_query_impl.hxx:76
stream_query_input_iterator operator++(int)
Post-increment. Only here to satisfy input_iterator concept.
Definition: stream_query_impl.hxx:69
stream_query_input_iterator & operator++() &
Pre-increment. This is what you'd normally want to use.
Definition: stream_query_impl.hxx:59
Stream query results from the database. Used by transaction_base::stream.
Definition: stream_query.hxx:80
std::tuple< TYPE... > parse_line(zview line) &
Parse and convert the latest line of data we received.
Definition: stream_query.hxx:115
bool done() const &noexcept
Has this stream reached the end of its data?
Definition: stream_query.hxx:103
stream_query(transaction_base &tx, std::string_view query)
Execute query on tx, stream results.
Definition: stream_query_impl.hxx:12
auto begin() &
Begin iterator. Only for use by "range for.".
Definition: stream_query_impl.hxx:153
std::pair< line_handle, std::size_t > read_line() &
Read a COPY line from the server.
Definition: stream_query_impl.hxx:161
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:29
Marker-type wrapper: zero-terminated std::string_view.
Definition: zview.hxx:38
Error in usage of libpqxx library, similar to std::logic_error.
Definition: except.hxx:249
result exec0(zview query, std::string_view desc)
Execute command, which should return zero rows of data.
Definition: transaction_base.hxx:387
constexpr connection & conn() const noexcept
The connection in which this transaction lives.
Definition: transaction_base.hxx:998
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:150
void pqfreemem(void const *ptr) noexcept
Wrapper for PQfreemem(), with C++ linkage.
Definition: util.cxx:205
Internal items for libpqxx' own use. Do not use these yourself.
Definition: encodings.cxx:33
std::string concat(TYPE... item)
Efficiently combine a bunch of items into one big string.
Definition: concat.hxx:31
pqxx::internal::encoding_group enc_group(std::string_view encoding_name)
Convert libpq encoding name to its libpqxx encoding group.
Definition: encodings.cxx:35
std::size_t(std::string_view haystack, std::size_t start) char_finder_func
Function type: "find first occurrence of specific any of ASCII characters.".
Definition: encoding_group.hxx:71
The end() iterator for a stream_query.
Definition: stream_query.hxx:47
Definition: connection-stream_from.hxx:13