Loading...
Searching...
No Matches
health_checker.hpp
1/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
8#define BOOST_REDIS_HEALTH_CHECKER_HPP
9
10#include <boost/redis/request.hpp>
11#include <boost/redis/response.hpp>
12#include <boost/redis/operation.hpp>
13#include <boost/redis/adapter/any_adapter.hpp>
14#include <boost/redis/config.hpp>
15#include <boost/redis/operation.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/asio/compose.hpp>
18#include <boost/asio/consign.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/post.hpp>
21#include <memory>
22#include <chrono>
23
24namespace boost::redis::detail {
25
26template <class HealthChecker, class Connection, class Logger>
27class ping_op {
28public:
29 HealthChecker* checker_ = nullptr;
30 Connection* conn_ = nullptr;
31 Logger logger_;
32 asio::coroutine coro_{};
33
34 template <class Self>
35 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
36 {
37 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
38 {
39 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
40 logger_.trace("ping_op (1): timeout disabled.");
41 BOOST_ASIO_CORO_YIELD
42 asio::post(std::move(self));
43 self.complete({});
44 return;
45 }
46
47 if (checker_->checker_has_exited_) {
48 logger_.trace("ping_op (2): checker has exited.");
49 self.complete({});
50 return;
51 }
52
53 BOOST_ASIO_CORO_YIELD
54 conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
55 if (ec) {
56 logger_.trace("ping_op (3)", ec);
57 checker_->wait_timer_.cancel();
58 self.complete(ec);
59 return;
60 }
61
62 // Wait before pinging again.
63 checker_->ping_timer_.expires_after(checker_->ping_interval_);
64
65 BOOST_ASIO_CORO_YIELD
66 checker_->ping_timer_.async_wait(std::move(self));
67 if (ec) {
68 logger_.trace("ping_op (4)", ec);
69 self.complete(ec);
70 return;
71 }
72 }
73 }
74};
75
76template <class HealthChecker, class Connection, class Logger>
77class check_timeout_op {
78public:
79 HealthChecker* checker_ = nullptr;
80 Connection* conn_ = nullptr;
81 Logger logger_;
82 asio::coroutine coro_{};
83
84 template <class Self>
85 void operator()(Self& self, system::error_code ec = {})
86 {
87 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
88 {
89 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
90 logger_.trace("check_timeout_op (1): timeout disabled.");
91 BOOST_ASIO_CORO_YIELD
92 asio::post(std::move(self));
93 self.complete({});
94 return;
95 }
96
97 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
98
99 BOOST_ASIO_CORO_YIELD
100 checker_->wait_timer_.async_wait(std::move(self));
101 if (ec) {
102 logger_.trace("check_timeout_op (2)", ec);
103 self.complete(ec);
104 return;
105 }
106
107 if (checker_->resp_.has_error()) {
108 // TODO: Log the error.
109 logger_.trace("check_timeout_op (3): Response error.");
110 self.complete({});
111 return;
112 }
113
114 if (checker_->resp_.value().empty()) {
115 logger_.trace("check_timeout_op (4): pong timeout.");
116 checker_->ping_timer_.cancel();
117 conn_->cancel(operation::run);
118 checker_->checker_has_exited_ = true;
119 self.complete(error::pong_timeout);
120 return;
121 }
122
123 if (checker_->resp_.has_value()) {
124 checker_->resp_.value().clear();
125 }
126 }
127 }
128};
129
130template <class Executor>
131class health_checker {
132private:
133 using timer_type =
134 asio::basic_waitable_timer<
135 std::chrono::steady_clock,
136 asio::wait_traits<std::chrono::steady_clock>,
137 Executor>;
138
139public:
140 health_checker(Executor ex)
141 : ping_timer_{ex}
142 , wait_timer_{ex}
143 {
144 req_.push("PING", "Boost.Redis");
145 }
146
147 void set_config(config const& cfg)
148 {
149 req_.clear();
150 req_.push("PING", cfg.health_check_id);
151 ping_interval_ = cfg.health_check_interval;
152 }
153
154 void cancel()
155 {
156 ping_timer_.cancel();
157 wait_timer_.cancel();
158 }
159
160 template <class Connection, class Logger, class CompletionToken>
161 auto async_ping(Connection& conn, Logger l, CompletionToken token)
162 {
163 return asio::async_compose
164 < CompletionToken
165 , void(system::error_code)
166 >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
167 }
168
169 template <class Connection, class Logger, class CompletionToken>
170 auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
171 {
172 checker_has_exited_ = false;
173 return asio::async_compose
174 < CompletionToken
175 , void(system::error_code)
176 >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
177 }
178
179private:
180 template <class, class, class> friend class ping_op;
181 template <class, class, class> friend class check_timeout_op;
182
183 timer_type ping_timer_;
184 timer_type wait_timer_;
185 redis::request req_;
187 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
188 bool checker_has_exited_ = false;
189};
190
191} // boost::redis::detail
192
193#endif // BOOST_REDIS_HEALTH_CHECKER_HPP
void push(std::string_view cmd, Ts const &... args)
Appends a new command to the end of the request.
Definition request.hpp:146
void clear()
Clears the request preserving allocated memory.
Definition request.hpp:102
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition response.hpp:35
@ pong_timeout
Connect timeout.
@ run
Refers to connection::async_run operations.