7#ifndef BOOST_REDIS_RUNNER_HPP
8#define BOOST_REDIS_RUNNER_HPP
10#include <boost/redis/adapter/any_adapter.hpp>
11#include <boost/redis/config.hpp>
12#include <boost/redis/request.hpp>
13#include <boost/redis/response.hpp>
14#include <boost/redis/detail/helper.hpp>
15#include <boost/redis/error.hpp>
16#include <boost/redis/logger.hpp>
17#include <boost/redis/operation.hpp>
18#include <boost/asio/compose.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/experimental/parallel_group.hpp>
21#include <boost/asio/ip/tcp.hpp>
22#include <boost/asio/steady_timer.hpp>
23#include <boost/asio/prepend.hpp>
24#include <boost/asio/ssl.hpp>
25#include <boost/asio/cancel_after.hpp>
30namespace boost::redis::detail
33void push_hello(config
const& cfg, request& req);
38template <
class Runner,
class Connection,
class Logger>
40 Runner* runner_ =
nullptr;
41 Connection* conn_ =
nullptr;
43 asio::coroutine coro_{};
46 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
48 BOOST_ASIO_CORO_REENTER (coro_)
53 conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
54 logger_.on_hello(ec, runner_->hello_resp_);
62 if (runner_->has_error_in_response()) {
73template <
class Runner,
class Connection,
class Logger>
76 Runner* runner_ =
nullptr;
77 Connection* conn_ =
nullptr;
79 asio::coroutine coro_{};
81 using order_t = std::array<std::size_t, 5>;
84 runner_op(Runner* runner, Connection* conn, Logger l)
91 void operator()( Self& self
93 , system::error_code ec0 = {}
94 , system::error_code ec1 = {}
95 , system::error_code ec2 = {}
96 , system::error_code ec3 = {}
97 , system::error_code ec4 = {})
99 BOOST_ASIO_CORO_REENTER (coro_)
for (;;)
101 BOOST_ASIO_CORO_YIELD
102 conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
104 logger_.on_resolve(ec0, conn_->resv_.results());
111 BOOST_ASIO_CORO_YIELD
112 conn_->ctor_.async_connect(
113 conn_->next_layer().next_layer(),
114 conn_->resv_.results(),
115 asio::prepend(std::move(self), order_t {}));
117 logger_.on_connect(ec0, conn_->ctor_.endpoint());
124 if (conn_->use_ssl()) {
125 BOOST_ASIO_CORO_YIELD
126 conn_->next_layer().async_handshake(
127 asio::ssl::stream_base::client,
130 runner_->cfg_.ssl_handshake_timeout,
137 logger_.on_ssl_handshake(ec0);
150 BOOST_ASIO_CORO_YIELD
151 asio::experimental::make_parallel_group(
152 [
this](
auto token) {
return runner_->async_hello(*conn_, logger_, token); },
153 [
this](
auto token) {
return conn_->health_checker_.async_ping(*conn_, logger_, token); },
154 [
this](
auto token) {
return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
155 [
this](
auto token) {
return conn_->reader(logger_, token);},
156 [
this](
auto token) {
return conn_->writer(logger_, token);}
158 asio::experimental::wait_for_one_error(),
161 if (order[0] == 0 && !!ec0) {
176 if (!conn_->will_reconnect()) {
184 conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
186 BOOST_ASIO_CORO_YIELD
187 conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
193 if (!conn_->will_reconnect()) {
194 self.complete(asio::error::operation_aborted);
198 conn_->reset_stream();
203template <
class Executor>
206 runner(Executor ex, config cfg)
210 void set_config(config
const& cfg)
215 template <
class Connection,
class Logger,
class CompletionToken>
216 auto async_run(Connection& conn, Logger l, CompletionToken token)
218 return asio::async_compose
220 , void(system::error_code)
221 >(runner_op<runner, Connection, Logger>{
this, &conn, l}, token, conn);
226 template <
class,
class,
class>
friend class runner_op;
227 template <
class,
class,
class>
friend struct hello_op;
229 template <
class Connection,
class Logger,
class CompletionToken>
230 auto async_hello(Connection& conn, Logger l, CompletionToken token)
232 return asio::async_compose
234 , void(system::error_code)
235 >(hello_op<runner, Connection, Logger>{
this, &conn, l}, token, conn);
241 if (hello_resp_.has_value())
242 hello_resp_.value().clear();
243 push_hello(cfg_, hello_req_);
246 bool has_error_in_response() const noexcept
248 if (!hello_resp_.has_value())
251 auto f = [](
auto const& e)
253 switch (e.data_type) {
256 default:
return false;
260 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
void clear()
Clears the request preserving allocated memory.
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
@ pong_timeout
Connect timeout.
@ resp3_hello
Resp3 hello command error.
@ reconnection
Cancels reconnection.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.