libpqxx
The C++ client library for PostgreSQL
pipeline.hxx
Go to the documentation of this file.
1 /* Definition of the pqxx::pipeline class.
2  *
3  * Throughput-optimized mechanism for executing queries.
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/pipeline instead.
6  *
7  * Copyright (c) 2000-2026, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_PIPELINE_HXX
14 #define PQXX_PIPELINE_HXX
15 
16 #if !defined(PQXX_HEADER_PRE)
17 # error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18 #endif
19 
20 #include <limits>
21 #include <map>
22 #include <string>
23 
25 
26 
27 namespace pqxx
28 {
29 // TODO: libpq 14 introduced a similar "pipeline mode." Can we use that?
30 
32 
51 {
52 public:
54  using query_id = long;
55 
56  pipeline(pipeline const &) = delete;
57  pipeline &operator=(pipeline const &) = delete;
58  pipeline(pipeline &&) = delete;
59  pipeline &operator=(pipeline &&) = delete;
60 
61 
63  explicit pipeline(transaction_base &t, sl loc = sl::current()) :
64  transaction_focus{t, s_classname}, m_created_loc{loc}
65  {
66  init(loc);
67  }
70  transaction_base &t, std::string_view tname, sl loc = sl::current()) :
71  transaction_focus{t, s_classname, tname}, m_created_loc{loc}
72  {
73  init(loc);
74  }
75 
77  ~pipeline() noexcept;
78 
80 
86  query_id insert(std::string_view, sl = sl::current()) &;
87 
89 
95  void complete(sl = sl::current());
96 
98 
107  void flush(sl = sl::current());
108 
110 
118  void cancel(sl = sl::current());
119 
121  [[nodiscard]] bool is_finished(query_id) const;
122 
124 
130  result retrieve(query_id qid, sl loc = sl::current())
131  {
132  return retrieve(m_queries.find(qid), loc).second;
133  }
134 
136 
137  std::pair<query_id, result> retrieve(sl = sl::current());
138 
139  [[nodiscard]] bool empty() const noexcept { return std::empty(m_queries); }
140 
142 
153  int retain(int retain_max = 2) &;
154 
156  void resume(sl loc = sl::current()) &;
157 
158 private:
159  struct PQXX_PRIVATE Query final
160  {
161  explicit Query(std::string_view q) :
162  query{std::make_shared<std::string>(q)}
163  {}
164 
165  // NOLINTNEXTLINE(misc-non-private-member-variables-in-classes)
166  std::shared_ptr<std::string> query;
167  // NOLINTNEXTLINE(misc-non-private-member-variables-in-classes)
168  result res;
169  };
170 
171  using QueryMap = std::map<query_id, Query>;
172 
173  void init(sl);
174  void attach();
175  void detach();
176 
178  static constexpr query_id qid_limit() noexcept
179  {
180  // Parenthesise this to work around an eternal Visual C++ problem:
181  // Without the extra parentheses, unless NOMINMAX is defined, the
182  // preprocessor will mistake this "max" for its annoying built-in macro
183  // of the same name.
184  return (std::numeric_limits<query_id>::max)();
185  }
186 
188  PQXX_PRIVATE query_id generate_id();
189 
190  [[nodiscard]] bool have_pending() const noexcept
191  {
192  return m_issuedrange.second != m_issuedrange.first;
193  }
194 
195  PQXX_PRIVATE void issue(sl);
196 
198  void set_error_at(query_id qid) noexcept
199  {
200  if (qid < m_error) [[unlikely]]
201  m_error = qid;
202  }
203 
205  [[noreturn]] PQXX_PRIVATE void internal_error(std::string const &err, sl);
206 
207  PQXX_PRIVATE bool obtain_result(bool expect_none, sl);
208 
209  PQXX_PRIVATE void obtain_dummy(sl);
210  PQXX_PRIVATE void get_further_available_results(sl);
211  PQXX_PRIVATE void check_end_results();
212 
214  PQXX_PRIVATE void receive_if_available(sl loc);
215 
217  PQXX_PRIVATE void receive(pipeline::QueryMap::const_iterator stop, sl);
218  std::pair<pipeline::query_id, result>
219  retrieve(pipeline::QueryMap::iterator, sl);
220 
221  QueryMap m_queries;
222  std::pair<QueryMap::iterator, QueryMap::iterator> m_issuedrange;
223  int m_retain = 0;
224  int m_num_waiting = 0;
225  query_id m_q_id = 0;
226 
228  bool m_dummy_pending = false;
229 
231  query_id m_error = qid_limit();
232 
234 
238 
240  sl m_created_loc;
241 
242  static constexpr std::string_view s_classname{"pipeline"};
243 };
244 } // namespace pqxx
245 #endif
Processes several queries in FIFO manner, optimized for high throughput.
Definition: pipeline.hxx:51
bool empty() const noexcept
Definition: pipeline.hxx:139
pipeline(transaction_base &t, std::string_view tname, sl loc=sl::current())
Start a pipeline. Assign it a name, for more helpful error messages.
Definition: pipeline.hxx:69
pipeline(transaction_base &t, sl loc=sl::current())
Start a pipeline.
Definition: pipeline.hxx:63
pipeline(pipeline &&)=delete
pipeline(pipeline const &)=delete
long query_id
Identifying numbers for queries.
Definition: pipeline.hxx:54
pipeline & operator=(pipeline &&)=delete
pipeline & operator=(pipeline const &)=delete
Result set containing data returned by a query or command.
Definition: result.hxx:101
Base class for things that monopolise a transaction's attention.
Definition: transaction_focus.hxx:29
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:151
#define PQXX_LIBEXPORT
Definition: header-pre.hxx:206
#define PQXX_PRIVATE
Definition: header-pre.hxx:207
The home of all libpqxx classes, functions, templates, etc.
Definition: array.cxx:26
std::source_location sl
Convenience alias for std::source_location. It's just too long.
Definition: types.hxx:38
encoding_group
Definition: encoding_group.hxx:40
@ unknown
Default: indeterminate encoding. All we know is it supports ASCII.