7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/adapter/any_adapter.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/redis/detail/connector.hpp>
14#include <boost/redis/detail/health_checker.hpp>
15#include <boost/redis/detail/helper.hpp>
16#include <boost/redis/detail/resolver.hpp>
17#include <boost/redis/detail/resp3_handshaker.hpp>
18#include <boost/redis/error.hpp>
19#include <boost/redis/logger.hpp>
20#include <boost/redis/operation.hpp>
21#include <boost/redis/request.hpp>
22#include <boost/redis/resp3/type.hpp>
23#include <boost/redis/usage.hpp>
25#include <boost/asio/any_completion_handler.hpp>
26#include <boost/asio/any_io_executor.hpp>
27#include <boost/asio/associated_immediate_executor.hpp>
28#include <boost/asio/basic_stream_socket.hpp>
29#include <boost/asio/bind_executor.hpp>
30#include <boost/asio/buffer.hpp>
31#include <boost/asio/cancel_after.hpp>
32#include <boost/asio/coroutine.hpp>
33#include <boost/asio/deferred.hpp>
34#include <boost/asio/experimental/channel.hpp>
35#include <boost/asio/experimental/parallel_group.hpp>
36#include <boost/asio/io_context.hpp>
37#include <boost/asio/ip/tcp.hpp>
38#include <boost/asio/prepend.hpp>
39#include <boost/asio/read_until.hpp>
40#include <boost/asio/ssl/stream.hpp>
41#include <boost/asio/steady_timer.hpp>
42#include <boost/asio/write.hpp>
43#include <boost/assert.hpp>
45#include <boost/core/ignore_unused.hpp>
58namespace boost::redis {
62template <
class DynamicBuffer>
63std::string_view buffer_view(DynamicBuffer buf)
noexcept
65 char const* start =
static_cast<char const*
>(buf.data(0, buf.size()).data());
66 return std::string_view{start, std::size(buf)};
69template <
class AsyncReadStream,
class DynamicBuffer>
72 AsyncReadStream& stream_;
74 std::size_t size_ = 0;
76 asio::coroutine coro_{};
79 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
81 , buf_ {std::move(buf)}
86 void operator()( Self& self
87 , system::error_code ec = {}
90 BOOST_ASIO_CORO_REENTER (coro_)
96 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
102 buf_.shrink(buf_.size() - tmp_ - n);
103 self.complete({}, n);
108template <
class AsyncReadStream,
class DynamicBuffer,
class CompletionToken>
111 AsyncReadStream& stream,
112 DynamicBuffer buffer,
114 CompletionToken&& token)
116 return asio::async_compose
118 , void(system::error_code, std::size_t)
119 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
124 using req_info_type =
typename Conn::req_info;
125 using adapter_type =
typename Conn::adapter_type;
127 Conn* conn_ =
nullptr;
128 std::shared_ptr<req_info_type> info_ =
nullptr;
129 asio::coroutine coro{};
131 template <
class Self>
132 void operator()(Self& self , system::error_code = {}, std::size_t = 0)
134 BOOST_ASIO_CORO_REENTER (coro)
138 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
139 BOOST_ASIO_CORO_YIELD
141 asio::get_associated_immediate_executor(self, self.get_io_executor()),
146 conn_->add_request_info(info_);
149 BOOST_ASIO_CORO_YIELD
150 info_->async_wait(std::move(self));
153 self.complete(info_->ec_, 0);
157 if (info_->stop_requested()) {
160 return self.complete(asio::error::operation_aborted, 0);
163 if (is_cancelled(self)) {
164 if (!info_->is_waiting()) {
165 using c_t = asio::cancellation_type;
166 auto const c = self.get_cancellation_state().cancelled();
167 if ((c & c_t::terminal) != c_t::none) {
171 return self.complete(asio::error::operation_aborted, 0);
174 self.get_cancellation_state().clear();
182 conn_->remove_request(info_);
183 self.complete(asio::error::operation_aborted, 0);
188 self.complete(info_->ec_, info_->read_size_);
193template <
class Conn,
class Logger>
197 asio::coroutine coro{};
199 template <
class Self>
200 void operator()( Self& self
201 , system::error_code ec = {}
206 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
208 while (conn_->coalesce_requests()) {
209 if (conn_->use_ssl())
210 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
212 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
214 logger_.on_write(ec, conn_->write_buffer_);
217 logger_.trace(
"writer_op (1)", ec);
228 if (!conn_->is_open()) {
229 logger_.trace(
"writer_op (2): connection is closed.");
235 BOOST_ASIO_CORO_YIELD
236 conn_->writer_timer_.async_wait(std::move(self));
237 if (!conn_->is_open()) {
238 logger_.trace(
"writer_op (3): connection is closed.");
249template <
class Conn,
class Logger>
251 using parse_result =
typename Conn::parse_result;
252 using parse_ret_type =
typename Conn::parse_ret_type;
255 parse_ret_type res_{parse_result::resp, 0};
256 asio::coroutine coro{};
258 template <
class Self>
259 void operator()( Self& self
260 , system::error_code ec = {}
265 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
268 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
269 if (conn_->use_ssl()) {
270 BOOST_ASIO_CORO_YIELD
274 conn_->get_suggested_buffer_growth(),
277 BOOST_ASIO_CORO_YIELD
279 conn_->next_layer().next_layer(),
281 conn_->get_suggested_buffer_growth(),
285 logger_.on_read(ec, n);
289 logger_.trace(
"reader_op (1)", ec);
298 if (!conn_->is_open()) {
299 logger_.trace(
"reader_op (2): connection is closed.");
305 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
307 logger_.trace(
"reader_op (3)", ec);
313 if (res_.first == parse_result::push) {
314 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
315 BOOST_ASIO_CORO_YIELD
316 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
320 logger_.trace(
"reader_op (4)", ec);
326 if (!conn_->is_open()) {
327 logger_.trace(
"reader_op (5): connection is closed.");
328 self.complete(asio::error::operation_aborted);
337template <
class Conn,
class Logger>
340 Conn* conn_ =
nullptr;
342 asio::coroutine coro_{};
344 using order_t = std::array<std::size_t, 5>;
347 run_op(Conn* conn, Logger l)
352 template <
class Self>
353 void operator()( Self& self
355 , system::error_code ec0 = {}
356 , system::error_code ec1 = {}
357 , system::error_code ec2 = {}
358 , system::error_code ec3 = {}
359 , system::error_code ec4 = {})
361 BOOST_ASIO_CORO_REENTER (coro_)
for (;;)
363 BOOST_ASIO_CORO_YIELD
364 conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
366 logger_.on_resolve(ec0, conn_->resv_.results());
373 BOOST_ASIO_CORO_YIELD
374 conn_->ctor_.async_connect(
375 conn_->next_layer().next_layer(),
376 conn_->resv_.results(),
377 asio::prepend(std::move(self), order_t {}));
379 logger_.on_connect(ec0, conn_->ctor_.endpoint());
386 if (conn_->use_ssl()) {
387 BOOST_ASIO_CORO_YIELD
388 conn_->next_layer().async_handshake(
389 asio::ssl::stream_base::client,
392 conn_->cfg_.ssl_handshake_timeout,
399 logger_.on_ssl_handshake(ec0);
412 BOOST_ASIO_CORO_YIELD
413 asio::experimental::make_parallel_group(
414 [
this](
auto token) {
return conn_->handshaker_.async_hello(*conn_, logger_, token); },
415 [
this](
auto token) {
return conn_->health_checker_.async_ping(*conn_, logger_, token); },
416 [
this](
auto token) {
return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
417 [
this](
auto token) {
return conn_->reader(logger_, token);},
418 [
this](
auto token) {
return conn_->writer(logger_, token);}
420 asio::experimental::wait_for_one_error(),
423 if (order[0] == 0 && !!ec0) {
438 if (!conn_->will_reconnect()) {
446 conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
448 BOOST_ASIO_CORO_YIELD
449 conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
455 if (!conn_->will_reconnect()) {
456 self.complete(asio::error::operation_aborted);
460 conn_->reset_stream();
477template <
class Executor>
483 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
490 {
return writer_timer_.get_executor();}
493 template <
class Executor1>
510 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
511 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
512 : ctx_{std::move(ctx)}
515 , receive_channel_{ex, 256}
517 , health_checker_{ex}
518 , dbuf_{read_buffer_, max_read_size}
521 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
527 asio::io_context& ioc,
528 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
529 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
571 class Logger = logger,
572 class CompletionToken = asio::default_completion_token_t<executor_type>>
577 CompletionToken&& token = {})
580 resv_.set_config(cfg);
581 ctor_.set_config(cfg);
582 health_checker_.set_config(cfg);
583 handshaker_.set_config(cfg);
584 l.set_prefix(cfg.log_prefix);
586 return asio::async_compose
588 , void(system::error_code)
589 >(detail::run_op<this_type, Logger>{
this, l}, token, writer_timer_);
614 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
616 {
return receive_channel_.async_receive(std::forward<CompletionToken>(token)); }
631 std::size_t size = 0;
633 auto f = [&](system::error_code
const& ec2, std::size_t n)
639 auto const res = receive_channel_.try_receive(f);
674 class CompletionToken = asio::default_completion_token_t<executor_type>
680 CompletionToken&& token = {})
690 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
695 CompletionToken&& token = {})
697 auto& adapter_impl = adapter.impl_;
698 BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size,
"Request and response have incompatible sizes.");
700 auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn),
get_executor());
702 return asio::async_compose
704 , void(system::error_code, std::size_t)
705 >(detail::exec_op<this_type>{
this, info}, token, writer_timer_);
726 cancel_unwritten_requests();
735 receive_channel_.cancel();
738 health_checker_.cancel();
743 health_checker_.cancel();
745 receive_channel_.cancel();
746 cancel_unwritten_requests();
752 auto run_is_canceled() const noexcept
753 {
return cancel_run_called_; }
765 { stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_); }
775 template <
class Response>
778 using namespace boost::redis::adapter;
779 auto g = boost_redis_adapt(
response);
780 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
788 using clock_type = std::chrono::steady_clock;
789 using clock_traits_type = asio::wait_traits<clock_type>;
790 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
792 using receive_channel_type = asio::experimental::channel<
executor_type, void(system::error_code, std::size_t)>;
793 using resolver_type = detail::resolver<Executor>;
794 using health_checker_type = detail::health_checker<Executor>;
795 using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
798 using exec_notifier_type = receive_channel_type;
800 auto use_ssl() const noexcept
803 auto cancel_on_conn_lost() -> std::size_t
806 auto cond = [](
auto const& ptr)
808 BOOST_ASSERT(ptr !=
nullptr);
810 if (ptr->is_waiting()) {
811 return !ptr->req_->get_config().cancel_on_connection_lost;
813 return !ptr->req_->get_config().cancel_if_unresponded;
817 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
819 auto const ret = std::distance(point, std::end(reqs_));
821 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
825 reqs_.erase(point, std::end(reqs_));
827 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
828 return ptr->mark_waiting();
834 auto cancel_unwritten_requests() -> std::size_t
836 auto f = [](
auto const& ptr)
838 BOOST_ASSERT(ptr !=
nullptr);
839 return !ptr->is_waiting();
842 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
844 auto const ret = std::distance(point, std::end(reqs_));
846 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
850 reqs_.erase(point, std::end(reqs_));
858 if (std::exchange(cancel_run_called_,
true)) {
863 writer_timer_.cancel();
864 receive_channel_.cancel();
865 cancel_on_conn_lost();
872 write_buffer_.clear();
875 cancel_push_requests();
879 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
880 BOOST_ASSERT_MSG(ptr !=
nullptr,
"Expects non-null pointer.");
881 if (ptr->is_staged()) {
890 using wrapped_adapter_type = std::function<void(node_type
const&, system::error_code&)>;
896 , expected_responses_{req.get_expected_responses()}
897 , status_{status::waiting}
901 adapter_ = [
this, adapter](node_type
const& nd, system::error_code& ec)
903 auto const i = req_->get_expected_responses() - expected_responses_;
910 notifier_.try_send(std::error_code{}, 0);
918 [[nodiscard]]
auto is_waiting() const noexcept
919 {
return status_ == status::waiting; }
921 [[nodiscard]]
auto is_written() const noexcept
922 {
return status_ == status::written; }
924 [[nodiscard]]
auto is_staged() const noexcept
925 {
return status_ == status::staged; }
927 void mark_written() noexcept
928 { status_ = status::written; }
930 void mark_staged() noexcept
931 { status_ = status::staged; }
933 void mark_waiting() noexcept
934 { status_ = status::waiting; }
936 [[nodiscard]]
auto stop_requested() const noexcept
937 {
return !notifier_.is_open();}
939 template <
class CompletionToken>
940 auto async_wait(CompletionToken&& token)
942 return notifier_.async_receive(std::forward<CompletionToken>(token));
952 exec_notifier_type notifier_;
954 wrapped_adapter_type adapter_;
957 std::size_t expected_responses_;
960 system::error_code ec_;
961 std::size_t read_size_;
964 void remove_request(std::shared_ptr<req_info>
const& info)
966 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
969 using reqs_type = std::deque<std::shared_ptr<req_info>>;
971 template <
class,
class>
friend struct detail::reader_op;
972 template <
class,
class>
friend struct detail::writer_op;
973 template <
class>
friend struct detail::exec_op;
974 template <
class,
class>
friend class detail::run_op;
976 void cancel_push_requests()
978 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
979 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
982 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
986 reqs_.erase(point, std::end(reqs_));
989 [[nodiscard]]
bool is_writing() const noexcept
991 return !write_buffer_.empty();
994 void add_request_info(std::shared_ptr<req_info>
const& info)
996 reqs_.push_back(info);
998 if (info->req_->has_hello_priority()) {
999 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](
auto const& e) {
1000 return e->is_waiting();
1003 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
1006 if (is_open() && !is_writing())
1007 writer_timer_.cancel();
1010 template <
class CompletionToken,
class Logger>
1011 auto reader(Logger l, CompletionToken&& token)
1013 return asio::async_compose
1015 , void(system::error_code)
1016 >(detail::reader_op<this_type, Logger>{
this, l},
1017 std::forward<CompletionToken>(token), writer_timer_);
1020 template <
class CompletionToken,
class Logger>
1021 auto writer(Logger l, CompletionToken&& token)
1023 return asio::async_compose
1025 , void(system::error_code)
1026 >(detail::writer_op<this_type, Logger>{
this, l}, std::forward<CompletionToken>(token), writer_timer_);
1029 [[nodiscard]]
bool coalesce_requests()
1033 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](
auto const& ri) {
1034 return !ri->is_waiting();
1037 std::for_each(point, std::cend(reqs_), [
this](
auto const& ri) {
1039 write_buffer_ += ri->req_->payload();
1041 usage_.commands_sent += ri->expected_responses_;
1044 usage_.bytes_sent += std::size(write_buffer_);
1046 return point != std::cend(reqs_);
1049 bool is_waiting_response() const noexcept
1051 if (std::empty(reqs_))
1058 return !reqs_.front()->is_waiting();
1063 if (stream_->next_layer().is_open()) {
1064 system::error_code ec;
1065 stream_->next_layer().close(ec);
1069 auto is_open() const noexcept {
return stream_->next_layer().is_open(); }
1070 auto& lowest_layer() noexcept {
return stream_->lowest_layer(); }
1074 BOOST_ASSERT(!read_buffer_.empty());
1098 if (reqs_.front()->expected_responses_ == 0)
1106 return reqs_.front()->is_waiting();
1109 auto get_suggested_buffer_growth() const noexcept
1111 return parser_.get_suggested_buffer_growth(4096);
1114 enum class parse_result { needs_more,
push, resp };
1116 using parse_ret_type = std::pair<parse_result, std::size_t>;
1118 parse_ret_type on_finish_parsing(parse_result t)
1120 if (t == parse_result::push) {
1121 usage_.pushes_received += 1;
1122 usage_.push_bytes_received += parser_.get_consumed();
1124 usage_.responses_received += 1;
1125 usage_.response_bytes_received += parser_.get_consumed();
1129 dbuf_.consume(parser_.get_consumed());
1130 auto const res = std::make_pair(t, parser_.get_consumed());
1135 parse_ret_type on_read(std::string_view data, system::error_code& ec)
1148 on_push_ = is_next_push();
1151 if (!resp3::parse(parser_, data, receive_adapter_, ec))
1152 return std::make_pair(parse_result::needs_more, 0);
1155 return std::make_pair(parse_result::push, 0);
1157 return on_finish_parsing(parse_result::push);
1160 BOOST_ASSERT_MSG(is_waiting_response(),
"Not waiting for a response (using MONITOR command perhaps?)");
1161 BOOST_ASSERT(!reqs_.empty());
1162 BOOST_ASSERT(reqs_.front() !=
nullptr);
1163 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
1165 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
1166 return std::make_pair(parse_result::needs_more, 0);
1169 reqs_.front()->ec_ = ec;
1170 reqs_.front()->proceed();
1171 return std::make_pair(parse_result::resp, 0);
1174 reqs_.front()->read_size_ += parser_.get_consumed();
1176 if (--reqs_.front()->expected_responses_ == 0) {
1178 reqs_.front()->proceed();
1182 return on_finish_parsing(parse_result::resp);
1187 write_buffer_.clear();
1188 read_buffer_.clear();
1191 cancel_run_called_ =
false;
1194 asio::ssl::context ctx_;
1195 std::unique_ptr<next_layer_type> stream_;
1200 timer_type writer_timer_;
1201 receive_channel_type receive_channel_;
1202 resolver_type resv_;
1203 detail::connector ctor_;
1204 health_checker_type health_checker_;
1205 resp3_handshaker_type handshaker_;
1206 receiver_adapter_type receive_adapter_;
1208 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
1211 std::string read_buffer_;
1212 dyn_buffer_type dbuf_;
1213 std::string write_buffer_;
1215 resp3::parser parser_{};
1216 bool on_push_ =
false;
1217 bool cancel_run_called_ =
false;
1240 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1241 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1246 asio::io_context& ioc,
1247 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1248 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1252 {
return impl_.get_executor(); }
1255 template <
class CompletionToken = asio::deferred_t>
1258 return asio::async_initiate<
1259 CompletionToken, void(boost::system::error_code)>(
1262 self->async_run_impl(*cfg, l, std::move(handler));
1263 }, token,
this, &cfg, l);
1267 template <
class CompletionToken = asio::deferred_t>
1269 {
return impl_.async_receive(std::forward<CompletionToken>(token)); }
1274 return impl_.receive(ec);
1278 template <
class Response,
class CompletionToken = asio::deferred_t>
1281 return async_exec(req,
any_adapter(resp), std::forward<CompletionToken>(token));
1285 template <
class CompletionToken = asio::deferred_t>
1290 CompletionToken&& token = {})
1292 return asio::async_initiate<
1293 CompletionToken, void(boost::system::error_code, std::size_t)>(
1296 self->async_exec_impl(*req, std::move(adapter), std::move(handler));
1297 }, token,
this, &req, std::move(adapter));
1305 {
return impl_.will_reconnect();}
1309 {
return impl_.next_layer(); }
1313 {
return impl_.next_layer(); }
1317 { impl_.reset_stream();}
1320 template <
class Response>
1322 { impl_.set_receive_response(
response); }
1326 {
return impl_.get_usage(); }
1330 {
return impl_.get_ssl_context();}
1337 asio::any_completion_handler<
void(boost::system::error_code)> token);
1343 asio::any_completion_handler<
void(boost::system::error_code, std::size_t)> token);
A type-erased reference to a response.
A SSL connection to the Redis server.
void reset_stream()
Resets the underlying stream.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
bool will_reconnect() const noexcept
Returns true if the connection was canceled.
executor_type get_executor() noexcept
Returns the associated executor.
usage get_usage() const noexcept
Returns connection usage information.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
auto async_run(config const &cfg={}, Logger l=Logger{}, CompletionToken &&token={})
Starts underlying connection operations.
basic_connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Constructs from a context.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
basic_connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Constructor.
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
void cancel(operation op=operation::all)
Cancel operations.
auto & next_layer() noexcept
Returns a reference to the next layer.
std::size_t receive(system::error_code &ec)
Receives server pushes synchronously without blocking.
auto async_exec(request const &req, any_adapter adapter, CompletionToken &&token={})
Executes commands on the Redis server asynchronously.
auto async_exec(request const &req, Response &resp=ignore, CompletionToken &&token={})
Executes commands on the Redis server asynchronously.
auto async_receive(CompletionToken &&token={})
Receives server side pushes asynchronously.
Executor executor_type
Executor type.
Rebinds the socket type to another executor.
A basic_connection that type erases the executor.
auto async_exec(request const &req, Response &resp, CompletionToken &&token={})
Calls boost::redis::basic_connection::async_exec.
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
std::size_t receive(system::error_code &ec)
Calls boost::redis::basic_connection::receive.
connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
asio::any_io_executor executor_type
Executor type.
auto async_exec(request const &req, any_adapter adapter, CompletionToken &&token={})
Calls boost::redis::basic_connection::async_exec.
connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
auto & next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
auto const & next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
void cancel(operation op=operation::all)
Calls boost::redis::basic_connection::cancel.
auto async_run(config const &cfg, logger l, CompletionToken &&token={})
Calls boost::redis::basic_connection::async_run.
usage get_usage() const noexcept
Returns connection usage information.
auto async_receive(CompletionToken &&token={})
Calls boost::redis::basic_connection::async_receive.
executor_type get_executor() noexcept
Returns the underlying executor.
void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
bool use_ssl
Uses SSL instead of a plain connection.
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
operation
Connection operations that can be cancelled.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ pong_timeout
Connect timeout.
@ not_connected
There is no stablished connection.
@ health_check
Health check operation.
@ exec
Refers to connection::async_exec operations.
@ resolve
Resolve operation.
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Connection usage information.
A node in the response tree.