Loading...
Searching...
No Matches
connection.hpp
1/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
9
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/adapter/any_adapter.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/redis/detail/connector.hpp>
14#include <boost/redis/detail/health_checker.hpp>
15#include <boost/redis/detail/helper.hpp>
16#include <boost/redis/detail/resolver.hpp>
17#include <boost/redis/detail/resp3_handshaker.hpp>
18#include <boost/redis/error.hpp>
19#include <boost/redis/logger.hpp>
20#include <boost/redis/operation.hpp>
21#include <boost/redis/request.hpp>
22#include <boost/redis/resp3/type.hpp>
23#include <boost/redis/usage.hpp>
24
25#include <boost/asio/any_completion_handler.hpp>
26#include <boost/asio/any_io_executor.hpp>
27#include <boost/asio/associated_immediate_executor.hpp>
28#include <boost/asio/basic_stream_socket.hpp>
29#include <boost/asio/bind_executor.hpp>
30#include <boost/asio/buffer.hpp>
31#include <boost/asio/cancel_after.hpp>
32#include <boost/asio/coroutine.hpp>
33#include <boost/asio/deferred.hpp>
34#include <boost/asio/experimental/channel.hpp>
35#include <boost/asio/experimental/parallel_group.hpp>
36#include <boost/asio/io_context.hpp>
37#include <boost/asio/ip/tcp.hpp>
38#include <boost/asio/prepend.hpp>
39#include <boost/asio/read_until.hpp>
40#include <boost/asio/ssl/stream.hpp>
41#include <boost/asio/steady_timer.hpp>
42#include <boost/asio/write.hpp>
43#include <boost/assert.hpp>
44
45#include <boost/core/ignore_unused.hpp>
46
47#include <algorithm>
48#include <array>
49#include <chrono>
50#include <cstddef>
51#include <deque>
52#include <functional>
53#include <limits>
54#include <memory>
55#include <string_view>
56#include <utility>
57
58namespace boost::redis {
59namespace detail
60{
61
62template <class DynamicBuffer>
63std::string_view buffer_view(DynamicBuffer buf) noexcept
64{
65 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
66 return std::string_view{start, std::size(buf)};
67}
68
69template <class AsyncReadStream, class DynamicBuffer>
70class append_some_op {
71private:
72 AsyncReadStream& stream_;
73 DynamicBuffer buf_;
74 std::size_t size_ = 0;
75 std::size_t tmp_ = 0;
76 asio::coroutine coro_{};
77
78public:
79 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
80 : stream_ {stream}
81 , buf_ {std::move(buf)}
82 , size_{size}
83 { }
84
85 template <class Self>
86 void operator()( Self& self
87 , system::error_code ec = {}
88 , std::size_t n = 0)
89 {
90 BOOST_ASIO_CORO_REENTER (coro_)
91 {
92 tmp_ = buf_.size();
93 buf_.grow(size_);
94
95 BOOST_ASIO_CORO_YIELD
96 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
97 if (ec) {
98 self.complete(ec, 0);
99 return;
100 }
101
102 buf_.shrink(buf_.size() - tmp_ - n);
103 self.complete({}, n);
104 }
105 }
106};
107
108template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
109auto
110async_append_some(
111 AsyncReadStream& stream,
112 DynamicBuffer buffer,
113 std::size_t size,
114 CompletionToken&& token)
115{
116 return asio::async_compose
117 < CompletionToken
118 , void(system::error_code, std::size_t)
119 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
120}
121
122template <class Conn>
123struct exec_op {
124 using req_info_type = typename Conn::req_info;
125 using adapter_type = typename Conn::adapter_type;
126
127 Conn* conn_ = nullptr;
128 std::shared_ptr<req_info_type> info_ = nullptr;
129 asio::coroutine coro{};
130
131 template <class Self>
132 void operator()(Self& self , system::error_code = {}, std::size_t = 0)
133 {
134 BOOST_ASIO_CORO_REENTER (coro)
135 {
136 // Check whether the user wants to wait for the connection to
137 // be stablished.
138 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
139 BOOST_ASIO_CORO_YIELD
140 asio::dispatch(
141 asio::get_associated_immediate_executor(self, self.get_io_executor()),
142 std::move(self));
143 return self.complete(error::not_connected, 0);
144 }
145
146 conn_->add_request_info(info_);
147
148EXEC_OP_WAIT:
149 BOOST_ASIO_CORO_YIELD
150 info_->async_wait(std::move(self));
151
152 if (info_->ec_) {
153 self.complete(info_->ec_, 0);
154 return;
155 }
156
157 if (info_->stop_requested()) {
158 // Don't have to call remove_request as it has already
159 // been by cancel(exec).
160 return self.complete(asio::error::operation_aborted, 0);
161 }
162
163 if (is_cancelled(self)) {
164 if (!info_->is_waiting()) {
165 using c_t = asio::cancellation_type;
166 auto const c = self.get_cancellation_state().cancelled();
167 if ((c & c_t::terminal) != c_t::none) {
168 // Cancellation requires closing the connection
169 // otherwise it stays in inconsistent state.
170 conn_->cancel(operation::run);
171 return self.complete(asio::error::operation_aborted, 0);
172 } else {
173 // Can't implement other cancelation types, ignoring.
174 self.get_cancellation_state().clear();
175
176 // TODO: Find out a better way to ignore
177 // cancelation.
178 goto EXEC_OP_WAIT;
179 }
180 } else {
181 // Cancelation can be honored.
182 conn_->remove_request(info_);
183 self.complete(asio::error::operation_aborted, 0);
184 return;
185 }
186 }
187
188 self.complete(info_->ec_, info_->read_size_);
189 }
190 }
191};
192
193template <class Conn, class Logger>
194struct writer_op {
195 Conn* conn_;
196 Logger logger_;
197 asio::coroutine coro{};
198
199 template <class Self>
200 void operator()( Self& self
201 , system::error_code ec = {}
202 , std::size_t n = 0)
203 {
204 ignore_unused(n);
205
206 BOOST_ASIO_CORO_REENTER (coro) for (;;)
207 {
208 while (conn_->coalesce_requests()) {
209 if (conn_->use_ssl())
210 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
211 else
212 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
213
214 logger_.on_write(ec, conn_->write_buffer_);
215
216 if (ec) {
217 logger_.trace("writer_op (1)", ec);
218 conn_->cancel(operation::run);
219 self.complete(ec);
220 return;
221 }
222
223 conn_->on_write();
224
225 // A socket.close() may have been called while a
226 // successful write might had already been queued, so we
227 // have to check here before proceeding.
228 if (!conn_->is_open()) {
229 logger_.trace("writer_op (2): connection is closed.");
230 self.complete({});
231 return;
232 }
233 }
234
235 BOOST_ASIO_CORO_YIELD
236 conn_->writer_timer_.async_wait(std::move(self));
237 if (!conn_->is_open()) {
238 logger_.trace("writer_op (3): connection is closed.");
239 // Notice this is not an error of the op, stoping was
240 // requested from the outside, so we complete with
241 // success.
242 self.complete({});
243 return;
244 }
245 }
246 }
247};
248
249template <class Conn, class Logger>
250struct reader_op {
251 using parse_result = typename Conn::parse_result;
252 using parse_ret_type = typename Conn::parse_ret_type;
253 Conn* conn_;
254 Logger logger_;
255 parse_ret_type res_{parse_result::resp, 0};
256 asio::coroutine coro{};
257
258 template <class Self>
259 void operator()( Self& self
260 , system::error_code ec = {}
261 , std::size_t n = 0)
262 {
263 ignore_unused(n);
264
265 BOOST_ASIO_CORO_REENTER (coro) for (;;)
266 {
267 // Appends some data to the buffer if necessary.
268 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
269 if (conn_->use_ssl()) {
270 BOOST_ASIO_CORO_YIELD
271 async_append_some(
272 conn_->next_layer(),
273 conn_->dbuf_,
274 conn_->get_suggested_buffer_growth(),
275 std::move(self));
276 } else {
277 BOOST_ASIO_CORO_YIELD
278 async_append_some(
279 conn_->next_layer().next_layer(),
280 conn_->dbuf_,
281 conn_->get_suggested_buffer_growth(),
282 std::move(self));
283 }
284
285 logger_.on_read(ec, n);
286
287 // The connection is not viable after an error.
288 if (ec) {
289 logger_.trace("reader_op (1)", ec);
290 conn_->cancel(operation::run);
291 self.complete(ec);
292 return;
293 }
294
295 // Somebody might have canceled implicitly or explicitly
296 // while we were suspended and after queueing so we have to
297 // check.
298 if (!conn_->is_open()) {
299 logger_.trace("reader_op (2): connection is closed.");
300 self.complete(ec);
301 return;
302 }
303 }
304
305 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
306 if (ec) {
307 logger_.trace("reader_op (3)", ec);
308 conn_->cancel(operation::run);
309 self.complete(ec);
310 return;
311 }
312
313 if (res_.first == parse_result::push) {
314 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
315 BOOST_ASIO_CORO_YIELD
316 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
317 }
318
319 if (ec) {
320 logger_.trace("reader_op (4)", ec);
321 conn_->cancel(operation::run);
322 self.complete(ec);
323 return;
324 }
325
326 if (!conn_->is_open()) {
327 logger_.trace("reader_op (5): connection is closed.");
328 self.complete(asio::error::operation_aborted);
329 return;
330 }
331
332 }
333 }
334 }
335};
336
337template <class Conn, class Logger>
338class run_op {
339private:
340 Conn* conn_ = nullptr;
341 Logger logger_;
342 asio::coroutine coro_{};
343
344 using order_t = std::array<std::size_t, 5>;
345
346public:
347 run_op(Conn* conn, Logger l)
348 : conn_{conn}
349 , logger_{l}
350 {}
351
352 template <class Self>
353 void operator()( Self& self
354 , order_t order = {}
355 , system::error_code ec0 = {}
356 , system::error_code ec1 = {}
357 , system::error_code ec2 = {}
358 , system::error_code ec3 = {}
359 , system::error_code ec4 = {})
360 {
361 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
362 {
363 BOOST_ASIO_CORO_YIELD
364 conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
365
366 logger_.on_resolve(ec0, conn_->resv_.results());
367
368 if (ec0) {
369 self.complete(ec0);
370 return;
371 }
372
373 BOOST_ASIO_CORO_YIELD
374 conn_->ctor_.async_connect(
375 conn_->next_layer().next_layer(),
376 conn_->resv_.results(),
377 asio::prepend(std::move(self), order_t {}));
378
379 logger_.on_connect(ec0, conn_->ctor_.endpoint());
380
381 if (ec0) {
382 self.complete(ec0);
383 return;
384 }
385
386 if (conn_->use_ssl()) {
387 BOOST_ASIO_CORO_YIELD
388 conn_->next_layer().async_handshake(
389 asio::ssl::stream_base::client,
390 asio::prepend(
391 asio::cancel_after(
392 conn_->cfg_.ssl_handshake_timeout,
393 std::move(self)
394 ),
395 order_t {}
396 )
397 );
398
399 logger_.on_ssl_handshake(ec0);
400
401 if (ec0) {
402 self.complete(ec0);
403 return;
404 }
405 }
406
407 conn_->reset();
408
409 // Note: Order is important here because the writer might
410 // trigger an async_write before the async_hello thereby
411 // causing an authentication problem.
412 BOOST_ASIO_CORO_YIELD
413 asio::experimental::make_parallel_group(
414 [this](auto token) { return conn_->handshaker_.async_hello(*conn_, logger_, token); },
415 [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
416 [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
417 [this](auto token) { return conn_->reader(logger_, token);},
418 [this](auto token) { return conn_->writer(logger_, token);}
419 ).async_wait(
420 asio::experimental::wait_for_one_error(),
421 std::move(self));
422
423 if (order[0] == 0 && !!ec0) {
424 self.complete(ec0);
425 return;
426 }
427
428 if (order[0] == 2 && ec2 == error::pong_timeout) {
429 self.complete(ec1);
430 return;
431 }
432
433 // The receive operation must be cancelled because channel
434 // subscription does not survive a reconnection but requires
435 // re-subscription.
436 conn_->cancel(operation::receive);
437
438 if (!conn_->will_reconnect()) {
439 conn_->cancel(operation::reconnection);
440 self.complete(ec3);
441 return;
442 }
443
444 // It is safe to use the writer timer here because we are not
445 // connected.
446 conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
447
448 BOOST_ASIO_CORO_YIELD
449 conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
450 if (ec0) {
451 self.complete(ec0);
452 return;
453 }
454
455 if (!conn_->will_reconnect()) {
456 self.complete(asio::error::operation_aborted);
457 return;
458 }
459
460 conn_->reset_stream();
461 }
462 }
463};
464
465} // boost::redis::detail
466
477template <class Executor>
479public:
481
483 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
484
486 using executor_type = Executor;
487
490 {return writer_timer_.get_executor();}
491
493 template <class Executor1>
499
507 explicit
509 executor_type ex,
510 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
511 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
512 : ctx_{std::move(ctx)}
513 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
514 , writer_timer_{ex}
515 , receive_channel_{ex, 256}
516 , resv_{ex}
517 , health_checker_{ex}
518 , dbuf_{read_buffer_, max_read_size}
519 {
521 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
522 }
523
525 explicit
527 asio::io_context& ioc,
528 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
529 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
530 : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size)
531 { }
532
570 template <
571 class Logger = logger,
572 class CompletionToken = asio::default_completion_token_t<executor_type>>
573 auto
575 config const& cfg = {},
576 Logger l = Logger{},
577 CompletionToken&& token = {})
578 {
579 cfg_ = cfg;
580 resv_.set_config(cfg);
581 ctor_.set_config(cfg);
582 health_checker_.set_config(cfg);
583 handshaker_.set_config(cfg);
584 l.set_prefix(cfg.log_prefix);
585
586 return asio::async_compose
587 < CompletionToken
588 , void(system::error_code)
589 >(detail::run_op<this_type, Logger>{this, l}, token, writer_timer_);
590 }
591
614 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
615 auto async_receive(CompletionToken&& token = {})
616 { return receive_channel_.async_receive(std::forward<CompletionToken>(token)); }
617
629 std::size_t receive(system::error_code& ec)
630 {
631 std::size_t size = 0;
632
633 auto f = [&](system::error_code const& ec2, std::size_t n)
634 {
635 ec = ec2;
636 size = n;
637 };
638
639 auto const res = receive_channel_.try_receive(f);
640 if (ec)
641 return 0;
642
643 if (!res)
645
646 return size;
647 }
648
672 template <
673 class Response = ignore_t,
674 class CompletionToken = asio::default_completion_token_t<executor_type>
675 >
676 auto
678 request const& req,
679 Response& resp = ignore,
680 CompletionToken&& token = {})
681 {
682 return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
683 }
684
690 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
691 auto
693 request const& req,
694 any_adapter adapter,
695 CompletionToken&& token = {})
696 {
697 auto& adapter_impl = adapter.impl_;
698 BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");
699
700 auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());
701
702 return asio::async_compose
703 < CompletionToken
704 , void(system::error_code, std::size_t)
705 >(detail::exec_op<this_type>{this, info}, token, writer_timer_);
706 }
707
720 {
721 switch (op) {
723 resv_.cancel();
724 break;
725 case operation::exec:
726 cancel_unwritten_requests();
727 break;
729 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
730 break;
731 case operation::run:
732 cancel_run();
733 break;
735 receive_channel_.cancel();
736 break;
738 health_checker_.cancel();
739 break;
740 case operation::all:
741 resv_.cancel();
742 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
743 health_checker_.cancel();
744 cancel_run(); // run
745 receive_channel_.cancel(); // receive
746 cancel_unwritten_requests(); // exec
747 break;
748 default: /* ignore */;
749 }
750 }
751
752 auto run_is_canceled() const noexcept
753 { return cancel_run_called_; }
754
756 bool will_reconnect() const noexcept
757 { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
758
760 auto const& get_ssl_context() const noexcept
761 { return ctx_;}
762
765 { stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_); }
766
768 auto& next_layer() noexcept
769 { return *stream_; }
770
772 auto const& next_layer() const noexcept
773 { return *stream_; }
775 template <class Response>
777 {
778 using namespace boost::redis::adapter;
779 auto g = boost_redis_adapt(response);
780 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
781 }
782
784 usage get_usage() const noexcept
785 { return usage_; }
786
787private:
788 using clock_type = std::chrono::steady_clock;
789 using clock_traits_type = asio::wait_traits<clock_type>;
790 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
791
792 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
793 using resolver_type = detail::resolver<Executor>;
794 using health_checker_type = detail::health_checker<Executor>;
795 using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
796 using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
797 using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
798 using exec_notifier_type = receive_channel_type;
799
800 auto use_ssl() const noexcept
801 { return cfg_.use_ssl;}
802
803 auto cancel_on_conn_lost() -> std::size_t
804 {
805 // Must return false if the request should be removed.
806 auto cond = [](auto const& ptr)
807 {
808 BOOST_ASSERT(ptr != nullptr);
809
810 if (ptr->is_waiting()) {
811 return !ptr->req_->get_config().cancel_on_connection_lost;
812 } else {
813 return !ptr->req_->get_config().cancel_if_unresponded;
814 }
815 };
816
817 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
818
819 auto const ret = std::distance(point, std::end(reqs_));
820
821 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
822 ptr->stop();
823 });
824
825 reqs_.erase(point, std::end(reqs_));
826
827 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
828 return ptr->mark_waiting();
829 });
830
831 return ret;
832 }
833
834 auto cancel_unwritten_requests() -> std::size_t
835 {
836 auto f = [](auto const& ptr)
837 {
838 BOOST_ASSERT(ptr != nullptr);
839 return !ptr->is_waiting();
840 };
841
842 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
843
844 auto const ret = std::distance(point, std::end(reqs_));
845
846 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
847 ptr->stop();
848 });
849
850 reqs_.erase(point, std::end(reqs_));
851 return ret;
852 }
853
854 void cancel_run()
855 {
856 // Protects the code below from being called more than
857 // once, see https://github.com/boostorg/redis/issues/181
858 if (std::exchange(cancel_run_called_, true)) {
859 return;
860 }
861
862 close();
863 writer_timer_.cancel();
864 receive_channel_.cancel();
865 cancel_on_conn_lost();
866 }
867
868 void on_write()
869 {
870 // We have to clear the payload right after writing it to use it
871 // as a flag that informs there is no ongoing write.
872 write_buffer_.clear();
873
874 // Notice this must come before the for-each below.
875 cancel_push_requests();
876
877 // There is small optimization possible here: traverse only the
878 // partition of unwritten requests instead of them all.
879 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
880 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
881 if (ptr->is_staged()) {
882 ptr->mark_written();
883 }
884 });
885 }
886
887 struct req_info {
888 public:
889 using node_type = resp3::basic_node<std::string_view>;
890 using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
891
892 explicit req_info(request const& req, adapter_type adapter, executor_type ex)
893 : notifier_{ex, 1}
894 , req_{&req}
895 , adapter_{}
896 , expected_responses_{req.get_expected_responses()}
897 , status_{status::waiting}
898 , ec_{{}}
899 , read_size_{0}
900 {
901 adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
902 {
903 auto const i = req_->get_expected_responses() - expected_responses_;
904 adapter(i, nd, ec);
905 };
906 }
907
908 auto proceed()
909 {
910 notifier_.try_send(std::error_code{}, 0);
911 }
912
913 void stop()
914 {
915 notifier_.close();
916 }
917
918 [[nodiscard]] auto is_waiting() const noexcept
919 { return status_ == status::waiting; }
920
921 [[nodiscard]] auto is_written() const noexcept
922 { return status_ == status::written; }
923
924 [[nodiscard]] auto is_staged() const noexcept
925 { return status_ == status::staged; }
926
927 void mark_written() noexcept
928 { status_ = status::written; }
929
930 void mark_staged() noexcept
931 { status_ = status::staged; }
932
933 void mark_waiting() noexcept
934 { status_ = status::waiting; }
935
936 [[nodiscard]] auto stop_requested() const noexcept
937 { return !notifier_.is_open();}
938
939 template <class CompletionToken>
940 auto async_wait(CompletionToken&& token)
941 {
942 return notifier_.async_receive(std::forward<CompletionToken>(token));
943 }
944
945 //private:
946 enum class status
947 { waiting
948 , staged
949 , written
950 };
951
952 exec_notifier_type notifier_;
953 request const* req_;
954 wrapped_adapter_type adapter_;
955
956 // Contains the number of commands that haven't been read yet.
957 std::size_t expected_responses_;
958 status status_;
959
960 system::error_code ec_;
961 std::size_t read_size_;
962 };
963
964 void remove_request(std::shared_ptr<req_info> const& info)
965 {
966 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
967 }
968
969 using reqs_type = std::deque<std::shared_ptr<req_info>>;
970
971 template <class, class> friend struct detail::reader_op;
972 template <class, class> friend struct detail::writer_op;
973 template <class> friend struct detail::exec_op;
974 template <class, class> friend class detail::run_op;
975
976 void cancel_push_requests()
977 {
978 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
979 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
980 });
981
982 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
983 ptr->proceed();
984 });
985
986 reqs_.erase(point, std::end(reqs_));
987 }
988
989 [[nodiscard]] bool is_writing() const noexcept
990 {
991 return !write_buffer_.empty();
992 }
993
994 void add_request_info(std::shared_ptr<req_info> const& info)
995 {
996 reqs_.push_back(info);
997
998 if (info->req_->has_hello_priority()) {
999 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
1000 return e->is_waiting();
1001 });
1002
1003 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
1004 }
1005
1006 if (is_open() && !is_writing())
1007 writer_timer_.cancel();
1008 }
1009
1010 template <class CompletionToken, class Logger>
1011 auto reader(Logger l, CompletionToken&& token)
1012 {
1013 return asio::async_compose
1014 < CompletionToken
1015 , void(system::error_code)
1016 >(detail::reader_op<this_type, Logger>{this, l},
1017 std::forward<CompletionToken>(token), writer_timer_);
1018 }
1019
1020 template <class CompletionToken, class Logger>
1021 auto writer(Logger l, CompletionToken&& token)
1022 {
1023 return asio::async_compose
1024 < CompletionToken
1025 , void(system::error_code)
1026 >(detail::writer_op<this_type, Logger>{this, l}, std::forward<CompletionToken>(token), writer_timer_);
1027 }
1028
1029 [[nodiscard]] bool coalesce_requests()
1030 {
1031 // Coalesces the requests and marks them staged. After a
1032 // successful write staged requests will be marked as written.
1033 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
1034 return !ri->is_waiting();
1035 });
1036
1037 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
1038 // Stage the request.
1039 write_buffer_ += ri->req_->payload();
1040 ri->mark_staged();
1041 usage_.commands_sent += ri->expected_responses_;
1042 });
1043
1044 usage_.bytes_sent += std::size(write_buffer_);
1045
1046 return point != std::cend(reqs_);
1047 }
1048
1049 bool is_waiting_response() const noexcept
1050 {
1051 if (std::empty(reqs_))
1052 return false;
1053
1054 // Under load and on low-latency networks we might start
1055 // receiving responses before the write operation completed and
1056 // the request is still maked as staged and not written. See
1057 // https://github.com/boostorg/redis/issues/170
1058 return !reqs_.front()->is_waiting();
1059 }
1060
1061 void close()
1062 {
1063 if (stream_->next_layer().is_open()) {
1064 system::error_code ec;
1065 stream_->next_layer().close(ec);
1066 }
1067 }
1068
1069 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
1070 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
1071
1072 auto is_next_push()
1073 {
1074 BOOST_ASSERT(!read_buffer_.empty());
1075
1076 // Useful links to understand the heuristics below.
1077 //
1078 // - https://github.com/redis/redis/issues/11784
1079 // - https://github.com/redis/redis/issues/6426
1080 // - https://github.com/boostorg/redis/issues/170
1081
1082 // The message's resp3 type is a push.
1083 if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
1084 return true;
1085
1086 // This is non-push type and the requests queue is empty. I have
1087 // noticed this is possible, for example with -MISCONF. I don't
1088 // know why they are not sent with a push type so we can
1089 // distinguish them from responses to commands. If we are lucky
1090 // enough to receive them when the command queue is empty they
1091 // can be treated as server pushes, otherwise it is impossible
1092 // to handle them properly
1093 if (reqs_.empty())
1094 return true;
1095
1096 // The request does not expect any response but we got one. This
1097 // may happen if for example, subscribe with wrong syntax.
1098 if (reqs_.front()->expected_responses_ == 0)
1099 return true;
1100
1101 // Added to deal with MONITOR and also to fix PR170 which
1102 // happens under load and on low-latency networks, where we
1103 // might start receiving responses before the write operation
1104 // completed and the request is still maked as staged and not
1105 // written.
1106 return reqs_.front()->is_waiting();
1107 }
1108
1109 auto get_suggested_buffer_growth() const noexcept
1110 {
1111 return parser_.get_suggested_buffer_growth(4096);
1112 }
1113
1114 enum class parse_result { needs_more, push, resp };
1115
1116 using parse_ret_type = std::pair<parse_result, std::size_t>;
1117
1118 parse_ret_type on_finish_parsing(parse_result t)
1119 {
1120 if (t == parse_result::push) {
1121 usage_.pushes_received += 1;
1122 usage_.push_bytes_received += parser_.get_consumed();
1123 } else {
1124 usage_.responses_received += 1;
1125 usage_.response_bytes_received += parser_.get_consumed();
1126 }
1127
1128 on_push_ = false;
1129 dbuf_.consume(parser_.get_consumed());
1130 auto const res = std::make_pair(t, parser_.get_consumed());
1131 parser_.reset();
1132 return res;
1133 }
1134
1135 parse_ret_type on_read(std::string_view data, system::error_code& ec)
1136 {
1137 // We arrive here in two states:
1138 //
1139 // 1. While we are parsing a message. In this case we
1140 // don't want to determine the type of the message in the
1141 // buffer (i.e. response vs push) but leave it untouched
1142 // until the parsing of a complete message ends.
1143 //
1144 // 2. On a new message, in which case we have to determine
1145 // whether the next message is a push or a response.
1146 //
1147 if (!on_push_) // Prepare for new message.
1148 on_push_ = is_next_push();
1149
1150 if (on_push_) {
1151 if (!resp3::parse(parser_, data, receive_adapter_, ec))
1152 return std::make_pair(parse_result::needs_more, 0);
1153
1154 if (ec)
1155 return std::make_pair(parse_result::push, 0);
1156
1157 return on_finish_parsing(parse_result::push);
1158 }
1159
1160 BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
1161 BOOST_ASSERT(!reqs_.empty());
1162 BOOST_ASSERT(reqs_.front() != nullptr);
1163 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
1164
1165 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
1166 return std::make_pair(parse_result::needs_more, 0);
1167
1168 if (ec) {
1169 reqs_.front()->ec_ = ec;
1170 reqs_.front()->proceed();
1171 return std::make_pair(parse_result::resp, 0);
1172 }
1173
1174 reqs_.front()->read_size_ += parser_.get_consumed();
1175
1176 if (--reqs_.front()->expected_responses_ == 0) {
1177 // Done with this request.
1178 reqs_.front()->proceed();
1179 reqs_.pop_front();
1180 }
1181
1182 return on_finish_parsing(parse_result::resp);
1183 }
1184
1185 void reset()
1186 {
1187 write_buffer_.clear();
1188 read_buffer_.clear();
1189 parser_.reset();
1190 on_push_ = false;
1191 cancel_run_called_ = false;
1192 }
1193
1194 asio::ssl::context ctx_;
1195 std::unique_ptr<next_layer_type> stream_;
1196
1197 // Notice we use a timer to simulate a condition-variable. It is
1198 // also more suitable than a channel and the notify operation does
1199 // not suspend.
1200 timer_type writer_timer_;
1201 receive_channel_type receive_channel_;
1202 resolver_type resv_;
1203 detail::connector ctor_;
1204 health_checker_type health_checker_;
1205 resp3_handshaker_type handshaker_;
1206 receiver_adapter_type receive_adapter_;
1207
1208 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
1209
1210 config cfg_;
1211 std::string read_buffer_;
1212 dyn_buffer_type dbuf_;
1213 std::string write_buffer_;
1214 reqs_type reqs_;
1215 resp3::parser parser_{};
1216 bool on_push_ = false;
1217 bool cancel_run_called_ = false;
1218
1219 usage usage_;
1220};
1221
1232public:
1234 using executor_type = asio::any_io_executor;
1235
1237 explicit
1239 executor_type ex,
1240 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1241 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1242
1244 explicit
1246 asio::io_context& ioc,
1247 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1248 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1249
1252 { return impl_.get_executor(); }
1253
1255 template <class CompletionToken = asio::deferred_t>
1256 auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
1257 {
1258 return asio::async_initiate<
1259 CompletionToken, void(boost::system::error_code)>(
1260 [](auto handler, connection* self, config const* cfg, logger l)
1261 {
1262 self->async_run_impl(*cfg, l, std::move(handler));
1263 }, token, this, &cfg, l);
1264 }
1265
1267 template <class CompletionToken = asio::deferred_t>
1268 auto async_receive(CompletionToken&& token = {})
1269 { return impl_.async_receive(std::forward<CompletionToken>(token)); }
1270
1272 std::size_t receive(system::error_code& ec)
1273 {
1274 return impl_.receive(ec);
1275 }
1276
1278 template <class Response, class CompletionToken = asio::deferred_t>
1279 auto async_exec(request const& req, Response& resp, CompletionToken&& token = {})
1280 {
1281 return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
1282 }
1283
1285 template <class CompletionToken = asio::deferred_t>
1286 auto
1288 request const& req,
1289 any_adapter adapter,
1290 CompletionToken&& token = {})
1291 {
1292 return asio::async_initiate<
1293 CompletionToken, void(boost::system::error_code, std::size_t)>(
1294 [](auto handler, connection* self, request const* req, any_adapter&& adapter)
1295 {
1296 self->async_exec_impl(*req, std::move(adapter), std::move(handler));
1297 }, token, this, &req, std::move(adapter));
1298 }
1299
1301 void cancel(operation op = operation::all);
1302
1304 bool will_reconnect() const noexcept
1305 { return impl_.will_reconnect();}
1306
1308 auto& next_layer() noexcept
1309 { return impl_.next_layer(); }
1310
1312 auto const& next_layer() const noexcept
1313 { return impl_.next_layer(); }
1314
1317 { impl_.reset_stream();}
1318
1320 template <class Response>
1322 { impl_.set_receive_response(response); }
1323
1325 usage get_usage() const noexcept
1326 { return impl_.get_usage(); }
1327
1329 auto const& get_ssl_context() const noexcept
1330 { return impl_.get_ssl_context();}
1331
1332private:
1333 void
1334 async_run_impl(
1335 config const& cfg,
1336 logger l,
1337 asio::any_completion_handler<void(boost::system::error_code)> token);
1338
1339 void
1340 async_exec_impl(
1341 request const& req,
1342 any_adapter&& adapter,
1343 asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);
1344
1346};
1347
1348} // boost::redis
1349
1350#endif // BOOST_REDIS_CONNECTION_HPP
A type-erased reference to a response.
A SSL connection to the Redis server.
void reset_stream()
Resets the underlying stream.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
bool will_reconnect() const noexcept
Returns true if the connection was canceled.
executor_type get_executor() noexcept
Returns the associated executor.
usage get_usage() const noexcept
Returns connection usage information.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
auto async_run(config const &cfg={}, Logger l=Logger{}, CompletionToken &&token={})
Starts underlying connection operations.
basic_connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Constructs from a context.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
basic_connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Constructor.
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
void cancel(operation op=operation::all)
Cancel operations.
auto & next_layer() noexcept
Returns a reference to the next layer.
std::size_t receive(system::error_code &ec)
Receives server pushes synchronously without blocking.
auto async_exec(request const &req, any_adapter adapter, CompletionToken &&token={})
Executes commands on the Redis server asynchronously.
auto async_exec(request const &req, Response &resp=ignore, CompletionToken &&token={})
Executes commands on the Redis server asynchronously.
auto async_receive(CompletionToken &&token={})
Receives server side pushes asynchronously.
Executor executor_type
Executor type.
Rebinds the socket type to another executor.
A basic_connection that type erases the executor.
auto async_exec(request const &req, Response &resp, CompletionToken &&token={})
Calls boost::redis::basic_connection::async_exec.
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
std::size_t receive(system::error_code &ec)
Calls boost::redis::basic_connection::receive.
connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
asio::any_io_executor executor_type
Executor type.
auto async_exec(request const &req, any_adapter adapter, CompletionToken &&token={})
Calls boost::redis::basic_connection::async_exec.
connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
auto & next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
auto const & next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
void cancel(operation op=operation::all)
Calls boost::redis::basic_connection::cancel.
auto async_run(config const &cfg, logger l, CompletionToken &&token={})
Calls boost::redis::basic_connection::async_run.
usage get_usage() const noexcept
Returns connection usage information.
auto async_receive(CompletionToken &&token={})
Calls boost::redis::basic_connection::async_receive.
executor_type get_executor() noexcept
Returns the underlying executor.
void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
Logger class.
Definition logger.hpp:27
Creates Redis requests.
Definition request.hpp:46
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
Definition config.hpp:80
bool use_ssl
Uses SSL instead of a plain connection.
Definition config.hpp:32
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
Definition ignore.hpp:31
operation
Connection operations that can be cancelled.
Definition operation.hpp:18
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition response.hpp:25
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ pong_timeout
Connect timeout.
@ not_connected
There is no stablished connection.
@ health_check
Health check operation.
@ exec
Refers to connection::async_exec operations.
@ resolve
Resolve operation.
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition config.hpp:30
Connection usage information.
Definition usage.hpp:21
A node in the response tree.
Definition node.hpp:28