为BOOST的ASIO增加“条件变量”实现协程间通讯

boost.asio提供了大量的async_*系方法来实现异步操作,配合协程使用非常简单粗暴。

但是实际使用过程中遇到了这样的情况:

已知若干个客户端(下文记作client)向一个队列(下文记作bus)投递消息,如何编程并保证队列不过载、数据不丢失?

如果使用协程来写,这个问题可以非常简单:

void client(yield_context yield) {
	char buf[128];
	size_t len;
	
	while (true) {
		len = read_from_socket(buf, yield);  // 挂起协程直到有数据到达
		write_to_bus(buf, len, yield);  // 挂起协程直到bus可写
	}
}

void server(yield_context yield) {
	char buf[128];
	size_t len;
	
	while (true) {
		len = read_from_bus(buf, yield);  // 挂起协程直到bus可读
		write_to_socket(buf, len, yield);  // 挂起协程直到socket可写,将bus数据发到其他地方
	}
}

这是一个很典型的消费者/生产者模式,其中消费者和生产者的比例是1:N。通常而言,在多线程环境下消费者/生产者模式可以靠条件变量去实现,用作消息通知。

因此,只要自行实现write_to_bus和read_from_bus,使用一种类似“条件变量”的东西在必要的时候挂起和唤醒协程就好。

但是查阅google,翻阅boost.asio,却没能找到这样一种能用于协程间通知事件的东西。相同的问题在stackoverflow上也有好几个,有用deadline_timer去做workaround的,却没有比较好的解决方法。

既然如此,那就只能自己来设计了。由于boost.asio的封装较为复杂,在翻阅源码后可以简单的知道:

  • boost::asio::handler_type<CompletionToken, Signature>用于获取handler的包装来唤醒协程
  • boost::asio::async_result<Handler>用于挂起协程
  • boost::asio::asio_handler_invoke根据调用环境上下文来唤起协程

直接贴代码:

#include <iostream>

#include <cstdint>
#include <array>
#include <queue>
#include <functional>

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/noncopyable.hpp>
#include <boost/system/error_code.hpp>

class async_condition_variable :
	public boost::noncopyable
{
public:
	using signature = auto(boost::system::error_code) -> void;

	template <typename CompletionToken>
	using handler_type_t = typename boost::asio::handler_type<CompletionToken, signature>::type;
	
	template <typename CompletionToken>
	using async_result = typename boost::asio::async_result<handler_type_t<CompletionToken>>;
	
	template <typename CompletionToken>
	using async_result_t = typename async_result<CompletionToken>::type;
private:
	using handler_block = std::array<uint8_t, 128U>;
	
	class handler_holder_base :
		public boost::noncopyable
	{
	public:
		handler_holder_base() {}
		virtual ~handler_holder_base() {}
		 
		virtual void exec(boost::system::error_code ec) = 0;
	};
	
	template <typename CompletionToken>
	class handler_holder :
		public handler_holder_base
	{
	private:
		boost::asio::io_service& m_iosv;
	public:
		handler_type_t<CompletionToken> m_handler;
	
		handler_holder(boost::asio::io_service& iosv, handler_type_t<CompletionToken>&& handler)
			: m_iosv(iosv), m_handler(std::forward<handler_type_t<CompletionToken>&&>(handler)) {}
			
		void exec(boost::system::error_code ec)override
		{
			handler_type_t<CompletionToken> handler = std::move(m_handler);
			m_iosv.post([handler, ec](){ boost::asio::asio_handler_invoke(std::bind(handler, ec), &handler); });
		}
	};
private:
	boost::asio::io_service& m_iosv;
	std::queue<size_t> m_handlers;
	
	// handler storage
	std::vector<handler_block> m_storage;
	std::vector<size_t> m_freelist;
public:
	async_condition_variable(boost::asio::io_service& iosv)
		: m_iosv(iosv) {}
	~async_condition_variable()
	{
	    while (!m_handlers.empty())
	    {
	        size_t idx = m_handlers.front();
			auto holder = reinterpret_cast<handler_holder_base*>(m_storage[idx].data());
			m_handlers.pop();
			try
			{
				holder->exec(boost::asio::error::make_error_code(boost::asio::error::interrupted));
			}
			catch (...)
			{
				free_handler(idx);
				continue;
			}
			free_handler(idx);
	    }
		assert(m_freelist.size() == m_storage.size());
	}
private:
	template <typename CompletionToken>
	size_t alloc_handler(handler_type_t<CompletionToken>&& handler)
	{
		static_assert(sizeof(handler_holder<CompletionToken>) <= std::tuple_size<handler_block>::value, "handler is too bigger.");
		
		if (m_freelist.empty())
		{
			m_storage.emplace_back();
			auto& storage = m_storage.back();
			new(storage.data()) handler_holder<CompletionToken>(m_iosv, std::forward<handler_type_t<CompletionToken>&&>(handler));
			return m_storage.size() - 1;
		}
		else
		{
			size_t idx = m_freelist.back();
			auto& storage = m_storage[idx];
			m_freelist.pop_back();
			new(storage.data()) handler_holder<CompletionToken>(m_iosv, std::forward<handler_type_t<CompletionToken>&&>(handler));
			return idx;
		}
	}
	
	void free_handler(size_t idx)
	{
		auto& storage = m_storage[idx];
		auto holder = reinterpret_cast<handler_holder_base*>(m_storage[idx].data());
		holder->~handler_holder_base();
		m_freelist.push_back(idx);
	}
public:
	template <typename CompletionToken>
	async_result_t<CompletionToken> async_wait(CompletionToken&& token)
	{
		handler_type_t<CompletionToken> handler(std::forward<CompletionToken&&>(token));
		async_result<CompletionToken> result(handler);
		size_t idx = alloc_handler<CompletionToken>(std::move(handler));
		auto holder = reinterpret_cast<handler_holder<CompletionToken>*>(m_storage[idx].data());
		m_handlers.push(idx);
		return result.get();
	}

	void notify_one()
	{
		if (!m_handlers.empty())
		{
			size_t idx = m_handlers.front();
			auto holder = reinterpret_cast<handler_holder_base*>(m_storage[idx].data());
			m_handlers.pop();
			try
			{
				holder->exec(boost::system::error_code());
			}
			catch (...)
			{
				free_handler(idx);
				throw;
			}
			free_handler(idx);
		}
	}
	
	void notify_all()
	{
		while (!m_handlers.empty())
			notify_one();
	}
};

int main()
{
	boost::asio::io_service iosv;
	
	auto cond_var = std::make_shared<async_condition_variable>(iosv);
		
	boost::asio::spawn(iosv, [&](boost::asio::yield_context yield){
		boost::system::error_code ec;
		std::cout << "point 1.1" << std::endl; 
		cond_var->async_wait(yield);
		std::cout << "point 2.1" << std::endl;
	});
	
	boost::asio::spawn(iosv, [&](boost::asio::yield_context yield){
		std::cout << "point 3" << std::endl;
		cond_var->notify_all();
		std::cout << "point 4" << std::endl;
	});
	
	boost::asio::spawn(iosv, [&](boost::asio::yield_context yield){
		boost::system::error_code ec;
		std::cout << "point 1.2" << std::endl; 
		cond_var->async_wait(yield);
		std::cout << "point 2.2" << std::endl;
	});
	
	boost::asio::spawn(iosv, [&](boost::asio::yield_context yield){
		std::cout << "point 5" << std::endl;
		cond_var->notify_all();
		std::cout << "point 6" << std::endl;
	});
	
	iosv.run();
	return 0;
}

编译:

g++ test.cpp -lboost_system -lboost_coroutine -I/usr/local/Cellar/boost/1.59.0/include -L/usr/local/lib/ -o test -std=c++11

这样,实现了一个条件变量来进行协程间的通信。

鸣谢

  • C君(@nadesico19)在boost协程库方面的帮助。

参考