Loading...
Searching...
No Matches
runner.hpp
1/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_RUNNER_HPP
8#define BOOST_REDIS_RUNNER_HPP
9
10#include <boost/redis/adapter/any_adapter.hpp>
11#include <boost/redis/config.hpp>
12#include <boost/redis/request.hpp>
13#include <boost/redis/response.hpp>
14#include <boost/redis/detail/helper.hpp>
15#include <boost/redis/error.hpp>
16#include <boost/redis/logger.hpp>
17#include <boost/redis/operation.hpp>
18#include <boost/asio/compose.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/experimental/parallel_group.hpp>
21#include <boost/asio/ip/tcp.hpp>
22#include <boost/asio/steady_timer.hpp>
23#include <boost/asio/prepend.hpp>
24#include <boost/asio/ssl.hpp>
25#include <boost/asio/cancel_after.hpp>
26#include <string>
27#include <memory>
28#include <chrono>
29
30namespace boost::redis::detail
31{
32
33void push_hello(config const& cfg, request& req);
34
35// TODO: Can we avoid this whole function whose only purpose is to
36// check for an error in the hello response and complete with an error
37// so that the parallel group that starts it can exit?
38template <class Runner, class Connection, class Logger>
39struct hello_op {
40 Runner* runner_ = nullptr;
41 Connection* conn_ = nullptr;
42 Logger logger_;
43 asio::coroutine coro_{};
44
45 template <class Self>
46 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
47 {
48 BOOST_ASIO_CORO_REENTER (coro_)
49 {
50 runner_->add_hello();
51
52 BOOST_ASIO_CORO_YIELD
53 conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
54 logger_.on_hello(ec, runner_->hello_resp_);
55
56 if (ec) {
57 conn_->cancel(operation::run);
58 self.complete(ec);
59 return;
60 }
61
62 if (runner_->has_error_in_response()) {
63 conn_->cancel(operation::run);
64 self.complete(error::resp3_hello);
65 return;
66 }
67
68 self.complete({});
69 }
70 }
71};
72
73template <class Runner, class Connection, class Logger>
74class runner_op {
75private:
76 Runner* runner_ = nullptr;
77 Connection* conn_ = nullptr;
78 Logger logger_;
79 asio::coroutine coro_{};
80
81 using order_t = std::array<std::size_t, 5>;
82
83public:
84 runner_op(Runner* runner, Connection* conn, Logger l)
85 : runner_{runner}
86 , conn_{conn}
87 , logger_{l}
88 {}
89
90 template <class Self>
91 void operator()( Self& self
92 , order_t order = {}
93 , system::error_code ec0 = {}
94 , system::error_code ec1 = {}
95 , system::error_code ec2 = {}
96 , system::error_code ec3 = {}
97 , system::error_code ec4 = {})
98 {
99 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
100 {
101 BOOST_ASIO_CORO_YIELD
102 conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
103
104 logger_.on_resolve(ec0, conn_->resv_.results());
105
106 if (ec0) {
107 self.complete(ec0);
108 return;
109 }
110
111 BOOST_ASIO_CORO_YIELD
112 conn_->ctor_.async_connect(
113 conn_->next_layer().next_layer(),
114 conn_->resv_.results(),
115 asio::prepend(std::move(self), order_t {}));
116
117 logger_.on_connect(ec0, conn_->ctor_.endpoint());
118
119 if (ec0) {
120 self.complete(ec0);
121 return;
122 }
123
124 if (conn_->use_ssl()) {
125 BOOST_ASIO_CORO_YIELD
126 conn_->next_layer().async_handshake(
127 asio::ssl::stream_base::client,
128 asio::prepend(
129 asio::cancel_after(
130 runner_->cfg_.ssl_handshake_timeout,
131 std::move(self)
132 ),
133 order_t {}
134 )
135 );
136
137 logger_.on_ssl_handshake(ec0);
138
139 if (ec0) {
140 self.complete(ec0);
141 return;
142 }
143 }
144
145 conn_->reset();
146
147 // Note: Order is important here because the writer might
148 // trigger an async_write before the async_hello thereby
149 // causing an authentication problem.
150 BOOST_ASIO_CORO_YIELD
151 asio::experimental::make_parallel_group(
152 [this](auto token) { return runner_->async_hello(*conn_, logger_, token); },
153 [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
154 [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
155 [this](auto token) { return conn_->reader(logger_, token);},
156 [this](auto token) { return conn_->writer(logger_, token);}
157 ).async_wait(
158 asio::experimental::wait_for_one_error(),
159 std::move(self));
160
161 if (order[0] == 0 && !!ec0) {
162 self.complete(ec0);
163 return;
164 }
165
166 if (order[0] == 2 && ec2 == error::pong_timeout) {
167 self.complete(ec1);
168 return;
169 }
170
171 // The receive operation must be cancelled because channel
172 // subscription does not survive a reconnection but requires
173 // re-subscription.
174 conn_->cancel(operation::receive);
175
176 if (!conn_->will_reconnect()) {
177 conn_->cancel(operation::reconnection);
178 self.complete(ec3);
179 return;
180 }
181
182 // It is safe to use the writer timer here because we are not
183 // connected.
184 conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
185
186 BOOST_ASIO_CORO_YIELD
187 conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
188 if (ec0) {
189 self.complete(ec0);
190 return;
191 }
192
193 if (!conn_->will_reconnect()) {
194 self.complete(asio::error::operation_aborted);
195 return;
196 }
197
198 conn_->reset_stream();
199 }
200 }
201};
202
203template <class Executor>
204class runner {
205public:
206 runner(Executor ex, config cfg)
207 : cfg_{cfg}
208 { }
209
210 void set_config(config const& cfg)
211 {
212 cfg_ = cfg;
213 }
214
215 template <class Connection, class Logger, class CompletionToken>
216 auto async_run(Connection& conn, Logger l, CompletionToken token)
217 {
218 return asio::async_compose
219 < CompletionToken
220 , void(system::error_code)
221 >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
222 }
223
224private:
225
226 template <class, class, class> friend class runner_op;
227 template <class, class, class> friend struct hello_op;
228
229 template <class Connection, class Logger, class CompletionToken>
230 auto async_hello(Connection& conn, Logger l, CompletionToken token)
231 {
232 return asio::async_compose
233 < CompletionToken
234 , void(system::error_code)
235 >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
236 }
237
238 void add_hello()
239 {
240 hello_req_.clear();
241 if (hello_resp_.has_value())
242 hello_resp_.value().clear();
243 push_hello(cfg_, hello_req_);
244 }
245
246 bool has_error_in_response() const noexcept
247 {
248 if (!hello_resp_.has_value())
249 return true;
250
251 auto f = [](auto const& e)
252 {
253 switch (e.data_type) {
255 case resp3::type::blob_error: return true;
256 default: return false;
257 }
258 };
259
260 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
261 }
262
263 request hello_req_;
264 generic_response hello_resp_;
265 config cfg_;
266};
267
268} // boost::redis::detail
269
270#endif // BOOST_REDIS_RUNNER_HPP
void clear()
Clears the request preserving allocated memory.
Definition request.hpp:102
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition response.hpp:35
@ pong_timeout
Connect timeout.
@ resp3_hello
Resp3 hello command error.
@ reconnection
Cancels reconnection.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.