libpqxx  7.7.4
pipeline.hxx
1 /* Definition of the pqxx::pipeline class.
2  *
3  * Throughput-optimized mechanism for executing queries.
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/pipeline instead.
6  *
7  * Copyright (c) 2000-2022, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_H_PIPELINE
14 #define PQXX_H_PIPELINE
15 
16 #if !defined(PQXX_HEADER_PRE)
17 # error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18 #endif
19 
20 #include <limits>
21 #include <map>
22 #include <string>
23 
24 #include "pqxx/transaction_base.hxx"
25 
26 
27 namespace pqxx
28 {
29 // TODO: libpq 14 introduced a similar "pipeline mode." Can we use that?
30 
32 
50 class PQXX_LIBEXPORT pipeline : public transaction_focus
51 {
52 public:
54  using query_id = long;
55 
56  pipeline(pipeline const &) = delete;
57  pipeline &operator=(pipeline const &) = delete;
58 
60  explicit pipeline(transaction_base &t) : transaction_focus{t, s_classname}
61  {
62  init();
63  }
65  pipeline(transaction_base &t, std::string_view tname) :
66  transaction_focus{t, s_classname, tname}
67  {
68  init();
69  }
70 
72  ~pipeline() noexcept;
73 
75 
81  query_id insert(std::string_view) &;
82 
84 
90  void complete();
91 
93 
102  void flush();
103 
105 
113  void cancel();
114 
116  [[nodiscard]] bool is_finished(query_id) const;
117 
119 
126  {
127  return retrieve(m_queries.find(qid)).second;
128  }
129 
131 
132  std::pair<query_id, result> retrieve();
133 
134  [[nodiscard]] bool empty() const noexcept { return std::empty(m_queries); }
135 
138 
149  int retain(int retain_max = 2) &;
150 
151 
153  void resume() &;
154 
155 private:
156  struct PQXX_PRIVATE Query
157  {
158  explicit Query(std::string_view q) :
159  query{std::make_shared<std::string>(q)}
160  {}
161 
162  std::shared_ptr<std::string> query;
163  result res;
164  };
165 
166  using QueryMap = std::map<query_id, Query>;
167 
168  void init();
169  void attach();
170  void detach();
171 
173  static constexpr query_id qid_limit() noexcept
174  {
175  // Parenthesise this to work around an eternal Visual C++ problem:
176  // Without the extra parentheses, unless NOMINMAX is defined, the
177  // preprocessor will mistake this "max" for its annoying built-in macro
178  // of the same name.
179  return (std::numeric_limits<query_id>::max)();
180  }
181 
183  PQXX_PRIVATE query_id generate_id();
184 
185  bool have_pending() const noexcept
186  {
187  return m_issuedrange.second != m_issuedrange.first;
188  }
189 
190  PQXX_PRIVATE void issue();
191 
193  void set_error_at(query_id qid) noexcept
194  {
195  PQXX_UNLIKELY
196  if (qid < m_error)
197  m_error = qid;
198  }
199 
201  [[noreturn]] PQXX_PRIVATE void internal_error(std::string const &err);
202 
203  PQXX_PRIVATE bool obtain_result(bool expect_none = false);
204 
205  PQXX_PRIVATE void obtain_dummy();
206  PQXX_PRIVATE void get_further_available_results();
207  PQXX_PRIVATE void check_end_results();
208 
210  PQXX_PRIVATE void receive_if_available();
211 
213  PQXX_PRIVATE void receive(pipeline::QueryMap::const_iterator stop);
214  std::pair<pipeline::query_id, result> retrieve(pipeline::QueryMap::iterator);
215 
216  QueryMap m_queries;
217  std::pair<QueryMap::iterator, QueryMap::iterator> m_issuedrange;
218  int m_retain = 0;
219  int m_num_waiting = 0;
220  query_id m_q_id = 0;
221 
223  bool m_dummy_pending = false;
224 
226  query_id m_error = qid_limit();
227 
229 
232  internal::encoding_group m_encoding;
233 
234  static constexpr std::string_view s_classname{"pipeline"};
235 };
236 } // namespace pqxx
237 #endif
Base class for things that monopolise a transaction&#39;s attention.
Definition: transaction_focus.hxx:28
The home of all libpqxx classes, functions, templates, etc.
Definition: array.hxx:26
pipeline(transaction_base &t)
Start a pipeline.
Definition: pipeline.hxx:60
long query_id
Identifying numbers for queries.
Definition: pipeline.hxx:54
Processes several queries in FIFO manner, optimized for high throughput.
Definition: pipeline.hxx:50
pipeline(transaction_base &t, std::string_view tname)
Start a pipeline. Assign it a name, for more helpful error messages.
Definition: pipeline.hxx:65
bool empty() const noexcept
Definition: pipeline.hxx:134
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:76
result retrieve(query_id qid)
Retrieve result for given query.
Definition: pipeline.hxx:125
Result set containing data returned by a query or command.
Definition: result.hxx:73
Internal error in libpqxx library.
Definition: except.hxx:166