UDPCaster.h

source: wtcpp/folder98/folder08/folder4/file03.md

/*!
 * \file UDPCaster.h
 * \project	WonderTrader
 *
 * \author Wesley
 * \date 2020/03/30
 * 
 * \brief UDP广播对象定义
 */
#pragma once

#include "../Includes/WTSMarcos.h"
#include "../Includes/WTSObject.hpp"
#include "../Share/StdUtils.hpp"

#include <boost/asio.hpp>
#include <queue>

NS_WTP_BEGIN
	class WTSTickData;
	class WTSQueue;
	class WTSVariant;
	class WTSOrdDtlData;
	class WTSOrdQueData;
	class WTSTransData;
NS_WTP_END

USING_NS_WTP;
class WTSBaseDataMgr;
class DataManager;

class UDPCaster
{
public:
	UDPCaster();
	~UDPCaster();

	typedef boost::asio::ip::udp::endpoint EndPoint;
	typedef struct tagUDPReceiver
	{
		EndPoint	_ep;
		uint32_t	_type;


		tagUDPReceiver(EndPoint ep, uint32_t t)
		{
			_ep = ep;
			_type = t;
		}

	} UDPReceiver;
	typedef std::shared_ptr<UDPReceiver>	UDPReceiverPtr;
	typedef std::vector<UDPReceiverPtr>		ReceiverList;

private:
	// 发送广播回调
	void handle_send_broad(const EndPoint& ep, const boost::system::error_code& error, std::size_t bytes_transferred); 
	void handle_send_multi(const EndPoint& ep, const boost::system::error_code& error, std::size_t bytes_transferred); 

	// 接收数据
	void do_receive();
	// 发送数据
	void do_send();
	// 广播
	void broadcast(WTSObject* data, uint32_t dataType);

public:
	bool	init(WTSVariant* cfg, WTSBaseDataMgr* bdMgr, DataManager* dtMgr);
	void	start(int bport);
	void	stop();

	// 添加客户端: 1 V n
	bool	addBRecver(const char* remote, int port, int type = 0);
	// 同时客户端: n V n
	bool	addMRecver(const char* remote, int port, int sendport, int type = 0);

	void	broadcast(WTSTickData* curTick);
	void	broadcast(WTSOrdQueData* curOrdQue);
	void	broadcast(WTSOrdDtlData* curOrdDtl);
	void	broadcast(WTSTransData* curTrans);

private:
	typedef boost::asio::ip::udp::socket	UDPSocket;
	typedef std::shared_ptr<UDPSocket>		UDPSocketPtr;

	enum 
	{ 
		max_length = 2048 
	};

	boost::asio::ip::udp::endpoint	m_senderEP;
	char			m_data[max_length];

	// 广播(客户端列表)
	ReceiverList	m_listFlatRecver;
	ReceiverList	m_listJsonRecver;
	ReceiverList	m_listRawRecver;
	UDPSocketPtr	m_sktBroadcast;
	UDPSocketPtr	m_sktSubscribe;

	typedef std::pair<UDPSocketPtr,UDPReceiverPtr>	MulticastPair;
	typedef std::vector<MulticastPair>	MulticastList;
	MulticastList	m_listFlatGroup;
	MulticastList	m_listJsonGroup;
	MulticastList	m_listRawGroup;
	boost::asio::io_service		m_ioservice;		// 可访问I/O功能的io_service 对象
	StdThreadPtr	m_thrdIO;

	StdThreadPtr	m_thrdCast;
	StdCondVariable	m_condCast;
	StdUniqueMutex	m_mtxCast;
	bool			m_bTerminated;

	WTSBaseDataMgr*	m_bdMgr;
	DataManager*	m_dtMgr;

	typedef struct _CastData
	{
		uint32_t	_datatype;
		WTSObject*	_data;

		_CastData(WTSObject* obj = NULL, uint32_t dataType = 0)
			: _data(obj), _datatype(dataType)
		{
			if (_data)
				_data->retain();
		}

		_CastData(const _CastData& data)
			: _data(data._data), _datatype(data._datatype)
		{
			if (_data)
				_data->retain();
		}

		~_CastData()
		{
			if (_data)
			{
				_data->release();
				_data = NULL;
			}
		}
	} CastData;

	std::queue<CastData>		m_dataQue;
};

UDPCaster.cpp

/*!
 * \file UDPCaster.cpp
 * \project	WonderTrader
 *
 * \author Wesley
 * \date 2020/03/30
 * 
 * \brief 
 */
#include "UDPCaster.h"
#include "DataManager.h"

#include "../Share/StrUtil.hpp"
#include "../Includes/WTSDataDef.hpp"
#include "../Includes/WTSContractInfo.hpp"
#include "../Includes/WTSVariant.hpp"

#include "../WTSTools/WTSBaseDataMgr.h"
#include "../WTSTools/WTSLogger.h"


#define UDP_MSG_SUBSCRIBE	0x100
#define UDP_MSG_PUSHTICK	0x200
#define UDP_MSG_PUSHORDQUE	0x201	//委托队列
#define UDP_MSG_PUSHORDDTL	0x202	//委托明细
#define UDP_MSG_PUSHTRANS	0x203	//逐笔成交

#pragma pack(push,1)
//UDP请求包
typedef struct _UDPReqPacket
{
	uint32_t		_type;
	char			_data[1020];
} UDPReqPacket;

//UDPTick数据包
template <typename T>
struct UDPDataPacket
{
	uint32_t	_type;
	T			_data;
};
#pragma pack(pop)
typedef UDPDataPacket<WTSTickStruct>	UDPTickPacket;
typedef UDPDataPacket<WTSOrdQueStruct>	UDPOrdQuePacket;
typedef UDPDataPacket<WTSOrdDtlStruct>	UDPOrdDtlPacket;
typedef UDPDataPacket<WTSTransStruct>	UDPTransPacket;

UDPCaster::UDPCaster()
	: m_bTerminated(false)
	, m_bdMgr(NULL)
	, m_dtMgr(NULL)
{
	
}


UDPCaster::~UDPCaster()
{
}

// 初始化
bool UDPCaster::init(WTSVariant* cfg, WTSBaseDataMgr* bdMgr, DataManager* dtMgr)
{
	m_bdMgr = bdMgr;
	m_dtMgr = dtMgr;

	// 通过配置文件 dtcfg.yaml 初始化对象
	if (!cfg->getBoolean("active"))
		return false;

	WTSVariant* cfgBC = cfg->get("broadcast");
	if (cfgBC)
	{
		for (uint32_t idx = 0; idx < cfgBC->size(); idx++)
		{
			WTSVariant* cfgItem = cfgBC->get(idx);
			// 添加单广播客户端
			addBRecver(cfgItem->getCString("host"), cfgItem->getInt32("port"), cfgItem->getUInt32("type"));
		}
	}

	WTSVariant* cfgMC = cfg->get("multicast");
	if (cfgMC)
	{
		for (uint32_t idx = 0; idx < cfgMC->size(); idx++)
		{
			WTSVariant* cfgItem = cfgMC->get(idx);
			// 添加多广播客户端
			addMRecver(cfgItem->getCString("host"), cfgItem->getInt32("port"), cfgItem->getInt32("sendport"), cfgItem->getUInt32("type"));
		}
	}

	//By Wesley @ 2022.01.11
	//这是订阅端口,但是以前全部用的bport,属于笔误
	//只能写一个兼容了
	int32_t sport = cfg->getInt32("sport");
	if (sport == 0)
		sport = cfg->getInt32("bport");
	start(sport);

	return true;
}

void UDPCaster::start(int sport)
{
	if (!m_listFlatRecver.empty() || !m_listJsonRecver.empty() || !m_listRawRecver.empty())
	{
		m_sktBroadcast.reset(new UDPSocket(m_ioservice, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)));
		boost::asio::socket_base::broadcast option(true);
		m_sktBroadcast->set_option(option);
	}

	try
	{
		m_sktSubscribe.reset(new UDPSocket(m_ioservice, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), sport)));
	}
	catch(...)
	{
		WTSLogger::error_f("Exception raised while start subscribing service @ port {}", sport);
	}

	do_receive();

	m_thrdIO.reset(new StdThread([this](){
		try
		{
			m_ioservice.run();
		}
		catch(...)
		{
			m_ioservice.stop();
		}
	}));
}

void UDPCaster::stop()
{
	m_bTerminated = true;
	m_ioservice.stop();
	if (m_thrdIO)
		m_thrdIO->join();

	m_condCast.notify_all();
	if (m_thrdCast)
		m_thrdCast->join();
}

void UDPCaster::do_receive()
{
	m_sktSubscribe->async_receive_from(boost::asio::buffer(m_data, max_length), m_senderEP,
		[this](boost::system::error_code ec, std::size_t bytes_recvd)
	{
		if(ec)
		{
			do_receive();
			return;
		}

		if (bytes_recvd == sizeof(UDPReqPacket))
		{
			UDPReqPacket* req = (UDPReqPacket*)m_data;

			std::string data;
			//处理请求
			if (req->_type == UDP_MSG_SUBSCRIBE)
			{
				const StringVector& ay = StrUtil::split(req->_data, ",");
				std::string code, exchg;
				for(const std::string& fullcode : ay)
				{
					auto pos = fullcode.find(".");
					if (pos == std::string::npos)
						code = fullcode;
					else
					{
						code = fullcode.substr(pos + 1);
						exchg = fullcode.substr(0, pos);
					}
					WTSContractInfo* ct = m_bdMgr->getContract(code.c_str(), exchg.c_str());
					if (ct == NULL)
						continue;

					WTSTickData* curTick = m_dtMgr->getCurTick(code.c_str(), exchg.c_str());
					if(curTick == NULL)
						continue;

					std::string* data = new std::string();
					data->resize(sizeof(UDPTickPacket), 0);
					UDPTickPacket* pkt = (UDPTickPacket*)data->data();
					pkt->_type = req->_type;
					memcpy(&pkt->_data, &curTick->getTickStruct(), sizeof(WTSTickStruct));
					curTick->release();
					m_sktSubscribe->async_send_to(
						boost::asio::buffer(*data, data->size()), m_senderEP,
						[this, data](const boost::system::error_code& ec, std::size_t /*bytes_sent*/)
					{
						delete data;
						if (ec)
						{
							WTSLogger::error("Sending data on UDP failed: %s", ec.message().c_str());
						}
					});
				}
			}			
		}
		else
		{
			std::string* data = new std::string("Can not indentify the command");
			m_sktSubscribe->async_send_to(
				boost::asio::buffer(*data, data->size()), m_senderEP,
				[this, data](const boost::system::error_code& ec, std::size_t /*bytes_sent*/)
			{
				delete data;
				if (ec)
				{
					WTSLogger::error("Sending data on UDP failed: %s", ec.message().c_str());
				}
			});
		}

		do_receive();
	});
}

bool UDPCaster::addBRecver(const char* remote, int port, int type /* = 0 */)
{
	try
	{
		// IP, 端口号
		boost::asio::ip::address_v4 addr = boost::asio::ip::address_v4::from_string(remote);
		UDPReceiverPtr item(new UDPReceiver(EndPoint(addr, port), type));
		// 将客户端添加到对应列表中
		if(type == 0)
			m_listFlatRecver.emplace_back(item);
		else if(type == 1)
			m_listJsonRecver.emplace_back(item);
		else if(type == 2)
			m_listRawRecver.emplace_back(item);
	}
	catch(...)
	{
		return false;
	}

	return true;
}

// 多对多模型, 添加sendport
bool UDPCaster::addMRecver(const char* remote, int port, int sendport, int type /* = 0 */)
{
	try
	{
		boost::asio::ip::address_v4 addr = boost::asio::ip::address_v4::from_string(remote);
		UDPReceiverPtr item(new UDPReceiver(EndPoint(addr, port), type));
		UDPSocketPtr sock(new UDPSocket(m_ioservice, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), sendport)));
		boost::asio::ip::multicast::join_group option(item->_ep.address());
		sock->set_option(option);
		if(type == 0)
			m_listFlatGroup.emplace_back(std::make_pair(sock, item));
		else if(type == 1)
			m_listJsonGroup.emplace_back(std::make_pair(sock, item));
		else if(type == 2)
			m_listRawGroup.emplace_back(std::make_pair(sock, item));
	}
	catch(...)
	{
		return false;
	}

	return true;
}

void UDPCaster::broadcast(WTSTickData* curTick)
{
	broadcast(curTick, UDP_MSG_PUSHTICK);
}

void UDPCaster::broadcast(WTSOrdDtlData* curOrdDtl)
{
	broadcast(curOrdDtl, UDP_MSG_PUSHORDDTL);
}

void UDPCaster::broadcast(WTSOrdQueData* curOrdQue)
{
	broadcast(curOrdQue, UDP_MSG_PUSHORDQUE);
}

void UDPCaster::broadcast(WTSTransData* curTrans)
{
	broadcast(curTrans, UDP_MSG_PUSHTRANS);
}

void UDPCaster::broadcast(WTSObject* data, uint32_t dataType)
{
	if(m_sktBroadcast == NULL || data == NULL || m_bTerminated)
		return;

	{
		StdUniqueLock lock(m_mtxCast);
		m_dataQue.push(CastData(data, dataType));
	}

	if(m_thrdCast == NULL)
	{
		m_thrdCast.reset(new StdThread([this](){

			while (!m_bTerminated)
			{
				if(m_dataQue.empty())
				{
					StdUniqueLock lock(m_mtxCast);
					m_condCast.wait(lock);
					continue;
				}	

				std::queue<CastData> tmpQue;
				{
					StdUniqueLock lock(m_mtxCast);
					tmpQue.swap(m_dataQue);
				}
				
				while(!tmpQue.empty())
				{
					const CastData& castData = tmpQue.front();

					if (castData._data == NULL)
						break;

					//直接广播
					if (!m_listRawGroup.empty() || !m_listRawRecver.empty())
					{
						std::string buf_raw;
						if (castData._datatype == UDP_MSG_PUSHTICK)
						{
							buf_raw.resize(sizeof(UDPTickPacket));
							UDPTickPacket* pack = (UDPTickPacket*)buf_raw.data();
							pack->_type = castData._datatype;
							WTSTickData* curObj = (WTSTickData*)castData._data;
							memcpy(&pack->_data, &curObj->getTickStruct(), sizeof(WTSTickStruct));
						}
						else if (castData._datatype == UDP_MSG_PUSHORDDTL)
						{
							buf_raw.resize(sizeof(UDPOrdDtlPacket));
							UDPOrdDtlPacket* pack = (UDPOrdDtlPacket*)buf_raw.data();
							pack->_type = castData._datatype;
							WTSOrdDtlData* curObj = (WTSOrdDtlData*)castData._data;
							memcpy(&pack->_data, &curObj->getOrdDtlStruct(), sizeof(WTSOrdDtlStruct));
						}
						else if (castData._datatype == UDP_MSG_PUSHORDQUE)
						{
							buf_raw.resize(sizeof(UDPOrdQuePacket));
							UDPOrdQuePacket* pack = (UDPOrdQuePacket*)buf_raw.data();
							pack->_type = castData._datatype;
							WTSOrdQueData* curObj = (WTSOrdQueData*)castData._data;
							memcpy(&pack->_data, &curObj->getOrdQueStruct(), sizeof(WTSOrdQueStruct));
						}
						else if (castData._datatype == UDP_MSG_PUSHTRANS)
						{
							buf_raw.resize(sizeof(UDPTransPacket));
							UDPTransPacket* pack = (UDPTransPacket*)buf_raw.data();
							pack->_type = castData._datatype;
							WTSTransData* curObj = (WTSTransData*)castData._data;
							memcpy(&pack->_data, &curObj->getTransStruct(), sizeof(WTSTransStruct));
						}
						else
						{
							break;
						}

						//广播
						boost::system::error_code ec;
						for (auto it = m_listRawRecver.begin(); it != m_listRawRecver.end(); it++)
						{
							const UDPReceiverPtr& receiver = (*it);
							m_sktBroadcast->send_to(boost::asio::buffer(buf_raw), receiver->_ep, 0, ec);
							if (ec)
							{
								WTSLogger::error_f("Error occured while sending to ({}:{}): {}({})", 
									receiver->_ep.address().to_string(), receiver->_ep.port(), ec.value(), ec.message());
							}
						}

						//组播
						for (auto it = m_listRawGroup.begin(); it != m_listRawGroup.end(); it++)
						{
							const MulticastPair& item = *it;
							it->first->send_to(boost::asio::buffer(buf_raw), item.second->_ep, 0, ec);
							if (ec)
							{
								WTSLogger::error_f("Error occured while sending to ({}:{}): {}({})",
									item.second->_ep.address().to_string(), item.second->_ep.port(), ec.value(), ec.message());
							}
						}
					}

					tmpQue.pop();
				} 
			}
		}));
	}
	else
	{
		m_condCast.notify_all();
	}

	//纯文本格式
	/*
	if(!m_listFlatRecver.empty() || !m_listFlatGroup.empty())
	{
		uint32_t curTime = curTick->actiontime()/1000;
		char buf_flat[2048] = {0};
		char *str = buf_flat;
		//日期,时间,买价,卖价,代码,最新价,开,高,低,今结,昨结,总手,现手,总持,增仓,档位[买x价,买x量,卖x价,卖x量]
		str += sprintf(str, "%04d.%02d.%02d,", 
			curTick->actiondate()/10000, curTick->actiondate()%10000/100, curTick->actiondate()%100);
		str += sprintf(str, "%02d:%02d:%02d,", 
			curTime/10000, curTime%10000/100, curTime%100);
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->bidprice(0)));
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->askprice(0)));
		str += sprintf(str, "%s,", curTick->code());

		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->price()));
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->open()));
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->high()));
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->low()));
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->settlepx()));
		str += sprintf(str, "%.2f,", PRICE_INT_TO_DOUBLE(curTick->preclose()));

		str += sprintf(str, "%u,", curTick->totalvolume());
		str += sprintf(str, "%u,", curTick->volume());
		str += sprintf(str, "%u,", curTick->openinterest());
		str += sprintf(str, "%d,", curTick->additional());

		for(int i = 0; i < 5; i++)
		{
			str += sprintf(str, "%.2f,%u,", PRICE_INT_TO_DOUBLE(curTick->bidprice(i)), curTick->bidqty(i));
			str += sprintf(str, "%.2f,%u,", PRICE_INT_TO_DOUBLE(curTick->askprice(i)), curTick->askqty(i));
		}

		for(auto it = m_listFlatRecver.begin(); it != m_listFlatRecver.end(); it++)
		{
			const UDPReceiverPtr& receiver = (*it);
			m_sktBroadcast->send_to(boost::asio::buffer(buf_flat, strlen(buf_flat)), receiver->_ep);
			sendTicks++;
			sendBytes += strlen(buf_flat);
		}

		//组播
		for(auto it = m_listFlatGroup.begin(); it != m_listFlatGroup.end(); it++)
		{
			const MulticastPair& item = *it;
			it->first->send_to(boost::asio::buffer(buf_flat, strlen(buf_flat)), item.second->_ep);
			sendTicks++;
			sendBytes += strlen(buf_flat);
		}
	}
	

	//json格式
	if(!m_listJsonRecver.empty() || !m_listJsonGroup.empty())
	{
		datasvr::TickData newTick;
		newTick.set_market(curTick->market());
		newTick.set_code(curTick->code());

		newTick.set_price(curTick->price());
		newTick.set_open(curTick->open());
		newTick.set_high(curTick->high());
		newTick.set_low(curTick->low());
		newTick.set_preclose(curTick->preclose());
		newTick.set_settlepx(curTick->settlepx());

		newTick.set_totalvolume(curTick->totalvolume());
		newTick.set_volume(curTick->volume());
		newTick.set_totalmoney(curTick->totalturnover());
		newTick.set_money(curTick->turnover());
		newTick.set_openinterest(curTick->openinterest());
		newTick.set_additional(curTick->additional());

		newTick.set_tradingdate(curTick->tradingdate());
		newTick.set_actiondate(curTick->actiondate());
		newTick.set_actiontime(curTick->actiontime());

		for(int i = 0; i < 10; i++)
		{
			if(curTick->bidprice(i) == 0 && curTick->askprice(i) == 0)
				break;

			newTick.add_bidprices(curTick->bidprice(i));
			newTick.add_bidqty(curTick->bidqty(i));

			newTick.add_askprices(curTick->askprice(i));
			newTick.add_askqty(curTick->askqty(i));
		}
		const std::string& buf_json =  pb2json(newTick);

		//广播
		for(auto it = m_listJsonRecver.begin(); it != m_listJsonRecver.end(); it++)
		{
			const UDPReceiverPtr& receiver = (*it);
			m_sktBroadcast->send_to(boost::asio::buffer(buf_json), receiver->_ep);
			sendTicks++;
			sendBytes += buf_json.size();
		}

		//组播
		for(auto it = m_listJsonGroup.begin(); it != m_listJsonGroup.end(); it++)
		{
			const MulticastPair& item = *it;
			it->first->send_to(boost::asio::buffer(buf_json), item.second->_ep);
			sendTicks++;
			sendBytes += buf_json.size();
		}
	}
	*/
}

void UDPCaster::handle_send_broad(const EndPoint& ep, const boost::system::error_code& error, std::size_t bytes_transferred)
{
	if(error)
	{
		WTSLogger::error("Broadcasting of market data failed, remote addr: %s, error message: %s", ep.address().to_string().c_str(), error.message().c_str());
	}
}

void UDPCaster::handle_send_multi(const EndPoint& ep, const boost::system::error_code& error, std::size_t bytes_transferred)
{
	if(error)
	{
		WTSLogger::error("Multicasting of market data failed, remote addr: %s, error message: %s", ep.address().to_string().c_str(), error.message().c_str());
	}
}