libpqxx  7.6.1
pipeline.hxx
1 /* Definition of the pqxx::pipeline class.
2  *
3  * Throughput-optimized mechanism for executing queries.
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/pipeline instead.
6  *
7  * Copyright (c) 2000-2022, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_H_PIPELINE
14 #define PQXX_H_PIPELINE
15 
16 #include "pqxx/compiler-public.hxx"
17 #include "pqxx/internal/compiler-internal-pre.hxx"
18 
19 #include <limits>
20 #include <map>
21 #include <string>
22 
23 #include "pqxx/transaction_base.hxx"
24 
25 
26 namespace pqxx
27 {
28 // TODO: libpq 13 introduced a similar "pipeline mode." Can we use that?
29 
31 
49 class PQXX_LIBEXPORT pipeline : public transaction_focus
50 {
51 public:
53  using query_id = long;
54 
55  pipeline(pipeline const &) = delete;
56  pipeline &operator=(pipeline const &) = delete;
57 
59  explicit pipeline(transaction_base &t) : transaction_focus{t, s_classname}
60  {
61  init();
62  }
64  pipeline(transaction_base &t, std::string_view tname) :
65  transaction_focus{t, s_classname, tname}
66  {
67  init();
68  }
69 
71  ~pipeline() noexcept;
72 
74 
80  query_id insert(std::string_view);
81 
83 
89  void complete();
90 
92 
101  void flush();
102 
104 
112  void cancel();
113 
115  [[nodiscard]] bool is_finished(query_id) const;
116 
118 
125  {
126  return retrieve(m_queries.find(qid)).second;
127  }
128 
130 
131  std::pair<query_id, result> retrieve();
132 
133  [[nodiscard]] bool empty() const noexcept { return std::empty(m_queries); }
134 
137 
148  int retain(int retain_max = 2);
149 
150 
152  void resume();
153 
154 private:
155  struct PQXX_PRIVATE Query
156  {
157  explicit Query(std::string_view q) :
158  query{std::make_shared<std::string>(q)}
159  {}
160 
161  std::shared_ptr<std::string> query;
162  result res;
163  };
164 
165  using QueryMap = std::map<query_id, Query>;
166 
167  void init();
168  void attach();
169  void detach();
170 
172  static constexpr query_id qid_limit() noexcept
173  {
174  // Parenthesise this to work around an eternal Visual C++ problem:
175  // Without the extra parentheses, unless NOMINMAX is defined, the
176  // preprocessor will mistake this "max" for its annoying built-in macro
177  // of the same name.
178  return (std::numeric_limits<query_id>::max)();
179  }
180 
182  PQXX_PRIVATE query_id generate_id();
183 
184  bool have_pending() const noexcept
185  {
186  return m_issuedrange.second != m_issuedrange.first;
187  }
188 
189  PQXX_PRIVATE void issue();
190 
192  void set_error_at(query_id qid) noexcept
193  {
194  PQXX_UNLIKELY
195  if (qid < m_error)
196  m_error = qid;
197  }
198 
200  [[noreturn]] PQXX_PRIVATE void internal_error(std::string const &err);
201 
202  PQXX_PRIVATE bool obtain_result(bool expect_none = false);
203 
204  PQXX_PRIVATE void obtain_dummy();
205  PQXX_PRIVATE void get_further_available_results();
206  PQXX_PRIVATE void check_end_results();
207 
209  PQXX_PRIVATE void receive_if_available();
210 
212  PQXX_PRIVATE void receive(pipeline::QueryMap::const_iterator stop);
213  std::pair<pipeline::query_id, result> retrieve(pipeline::QueryMap::iterator);
214 
215  QueryMap m_queries;
216  std::pair<QueryMap::iterator, QueryMap::iterator> m_issuedrange;
217  int m_retain = 0;
218  int m_num_waiting = 0;
219  query_id m_q_id = 0;
220 
222  bool m_dummy_pending = false;
223 
225  query_id m_error = qid_limit();
226 
228 
231  internal::encoding_group m_encoding;
232 
233  constexpr static std::string_view s_classname{"pipeline"};
234 };
235 } // namespace pqxx
236 
237 #include "pqxx/internal/compiler-internal-post.hxx"
238 #endif
pipeline(transaction_base &t, std::string_view tname)
Start a pipeline. Assign it a name, for more helpful error messages.
Definition: pipeline.hxx:64
Internal error in libpqxx library.
Definition: except.hxx:157
bool empty() const noexcept
Definition: pipeline.hxx:133
The home of all libpqxx classes, functions, templates, etc.
Definition: array.hxx:25
pipeline(transaction_base &t)
Start a pipeline.
Definition: pipeline.hxx:59
Processes several queries in FIFO manner, optimized for high throughput.
Definition: pipeline.hxx:49
Base class for things that monopolise a transaction&#39;s attention.
Definition: transaction_focus.hxx:27
result retrieve(query_id qid)
Retrieve result for given query.
Definition: pipeline.hxx:124
Result set containing data returned by a query or command.
Definition: result.hxx:70
long query_id
Identifying numbers for queries.
Definition: pipeline.hxx:53
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:75