
source: wtcpp/folder98/folder05/folder1/file09.md




class IDataSink
	virtual void	handle_tick(const char* stdCode, WTSTickData* curTick) = 0;
	virtual void	handle_order_queue(const char* stdCode, WTSOrdQueData* curOrdQue) {};
	virtual void	handle_order_detail(const char* stdCode, WTSOrdDtlData* curOrdDtl) {};
	virtual void	handle_transaction(const char* stdCode, WTSTransData* curTrans) {};
	virtual void	handle_bar_close(const char* stdCode, const char* period, uint32_t times, WTSBarStruct* newBar) = 0;
	virtual void	handle_schedule(uint32_t uDate, uint32_t uTime) = 0;
	virtual void	handle_init() = 0;
	virtual void	handle_session_begin(uint32_t curTDate) = 0;
	virtual void	handle_session_end(uint32_t curTDate) = 0;
	virtual void	handle_replay_done() {}


历史数据加载器接口, 该接口中有回调函数需要被实现


wtcpp_Includes/04-01数据接口中也有一个历史数据加载器 IHisDataLoader, 二者功能类似

// 历史数据加载器的回调函数, obj: 回传用的(原样返回即可), key: 数据缓存的key, bars: K线数据, count: K线条数, factor: 复权因子, 最新的复权因子, 如果是后复权, 则factor不为1.0, 如果是前复权, 则factor必须为1.0)
typedef void(*FuncReadBars)(void* obj, WTSBarStruct* firstBar, uint32_t count);

// 加载复权因子回调, obj: 回传用的(原样返回即可), stdCode: 合约代码, dates: 
typedef void(*FuncReadFactors)(void* obj, const char* stdCode, uint32_t* dates, double* factors, uint32_t count);

typedef void(*FuncReadTicks)(void* obj, WTSTickStruct* firstTick, uint32_t count);

class IBtDataLoader
	// loadFinalHisBars, 系统认为是最终所需数据, 不再进行加工(例如复权数据, 主力合约数据), loadRawHisBars是加载未加工的原始数据的接口

	// 加载最终历史K线数据, obj: 回传用的(原样返回即可), stdCode: 合约代码, period: K线周期, cb: 回调函数
	virtual bool loadFinalHisBars(void* obj, const char* stdCode, WTSKlinePeriod period, FuncReadBars cb) = 0;

	// 加载原始历史K线数据, obj: 回传用的(原样返回即可), stdCode: 合约代码, period: K线周期, cb: 回调函数
	virtual bool loadRawHisBars(void* obj, const char* stdCode, WTSKlinePeriod period, FuncReadBars cb) = 0;

	// 加载全部除权因子
	virtual bool loadAllAdjFactors(void* obj, FuncReadFactors cb) = 0;

	// 根据合约加载除权因子, stdCode: 合约代码
	virtual bool loadAdjFactors(void* obj, const char* stdCode, FuncReadFactors cb) = 0;

	// 加载历史Tick数据
	virtual bool loadRawHisTicks(void* obj, const char* stdCode, uint32_t uDate, FuncReadTicks cb) = 0;

	// 是否自动转存储
	virtual bool isAutoTrans() { return true; }



class HisDataReplayer

	template <typename T>
	class HftDataList
		std::string		_code;
		uint32_t		_date;
		uint32_t		_cursor;
		uint32_t		_count;

		std::vector<T> _items;

		HftDataList() :_cursor(UINT_MAX), _count(0), _date(0){}

	typedef faster_hashmap<std::string, HftDataList<WTSTickStruct>>		TickCache;
	typedef faster_hashmap<std::string, HftDataList<WTSOrdDtlStruct>>	OrdDtlCache;
	typedef faster_hashmap<std::string, HftDataList<WTSOrdQueStruct>>	OrdQueCache;
	typedef faster_hashmap<std::string, HftDataList<WTSTransStruct>>	TransCache;

	typedef struct _BarsList
		std::string		_code;
		WTSKlinePeriod	_period;
		uint32_t		_cursor;
		uint32_t		_count;
		uint32_t		_times;

		std::vector<WTSBarStruct>	_bars;
		double			_factor;	//最后一条复权因子

		_BarsList() :_cursor(UINT_MAX), _count(0), _times(1), _factor(1){}
	} BarsList;

	typedef faster_hashmap<std::string, BarsList>	BarsCache;

	typedef enum tagTaskPeriodType
		TPT_None,		//不重复
		TPT_Minute = 4,	//分钟线周期
		TPT_Daily = 8,	//每个交易日
		TPT_Weekly,		//每周,遇到节假日的话要顺延
		TPT_Monthly,	//每月,遇到节假日顺延
		TPT_Yearly		//每年,遇到节假日顺延
	} TaskPeriodType;

	typedef struct _TaskInfo
		uint32_t	_id;
		char		_name[16];		// 任务名
		char		_trdtpl[16];	// 交易日模板
		char		_session[16];	// 交易时间模板
		uint32_t	_day;			// 日期, 根据周期变化, 每日为0, 每周为0~6, 对应周日到周六, 每月为1~31, 每年为0101~1231
		uint32_t	_time;			// 时间, 精确到分钟
		bool		_strict_time;	// 是否是严格时间, 严格时间即只有时间相等才会执行, 不是严格时间,则大于等于触发时间都会执行

		uint64_t	_last_exe_time;	// 上次执行时间,主要为了防止重复执行

		TaskPeriodType	_period;	
	} TaskInfo;

	typedef std::shared_ptr<TaskInfo> TaskInfoPtr;

	TickCache		_ticks_cache;	// tick缓存
	OrdDtlCache		_orddtl_cache;	// order detail缓存
	OrdQueCache		_ordque_cache;	// order queue缓存
	TransCache		_trans_cache;	// transaction缓存
	BarsCache		_bars_cache;	// K线缓存
	BarsCache		_unbars_cache;	// 未订阅的K线缓存
	TaskInfoPtr		_task;			// 调度任务指针

	typedef faster_hashset<uint32_t> SIDSet;
	typedef faster_hashmap<std::string, SIDSet>	StraSubMap;
	StraSubMap		_tick_sub_map;		// tick数据订阅表
	StraSubMap		_ordque_sub_map;	// orderqueue数据订阅表
	StraSubMap		_orddtl_sub_map;	// orderdetail数据订阅表
	StraSubMap		_trans_sub_map;		// transaction数据订阅表


	// 从自定义数据文件缓存历史数据
	bool		cacheRawBarsFromBin(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bForBars = true);

	// 从csv文件缓存历史数据
	bool		cacheRawBarsFromCSV(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed = true);

	// 从自定义数据文件缓存历史tick数据
	bool		cacheRawTicksFromBin(const std::string& key, const char* stdCode, uint32_t uDate);

	// 从csv文件缓存历史tick数据
	bool		cacheRawTicksFromCSV(const std::string& key, const char* stdCode, uint32_t uDate);

	// 从外部加载器缓存历史数据
	bool		cacheFinalBarsFromLoader(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed = true);

	// 从外部加载器缓存历史tick数据
	bool		cacheRawTicksFromLoader(const std::string& key, const char* stdCode, uint32_t uDate);

	// 缓存整合的期货合约历史K线(针对.HOT//2ND)
	bool		cacheIntegratedFutBars(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed = true);

	// 缓存复权股票K线数据
	bool		cacheAdjustedStkBars(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed = true);

	void		onMinuteEnd(uint32_t uDate, uint32_t uTime, uint32_t endTDate = 0, bool tickSimulated = true);

	void		loadFees(const char* filename);

	bool		replayHftDatas(uint64_t stime, uint64_t etime);

	uint64_t	replayHftDatasByDay(uint32_t curTDate);

	void		replayUnbars(uint64_t stime, uint64_t etime, uint32_t endTDate = 0);

	inline bool		checkTicks(const char* stdCode, uint32_t uDate);

	inline bool		checkOrderDetails(const char* stdCode, uint32_t uDate);

	inline bool		checkOrderQueues(const char* stdCode, uint32_t uDate);

	inline bool		checkTransactions(const char* stdCode, uint32_t uDate);

	void		checkUnbars();

	bool		loadStkAdjFactorsFromFile(const char* adjfile);

	bool		loadStkAdjFactorsFromLoader();

	bool		checkAllTicks(uint32_t uDate);

	inline	uint64_t	getNextTickTime(uint32_t curTDate, uint64_t stime = UINT64_MAX);
	inline	uint64_t	getNextOrdQueTime(uint32_t curTDate, uint64_t stime = UINT64_MAX);
	inline	uint64_t	getNextOrdDtlTime(uint32_t curTDate, uint64_t stime = UINT64_MAX);
	inline	uint64_t	getNextTransTime(uint32_t curTDate, uint64_t stime = UINT64_MAX);

	void		reset();
	void		dump_btstate(const char* stdCode, WTSKlinePeriod period, uint32_t times, uint64_t stime, uint64_t etime, double progress, int64_t elapse);
	void		notify_state(const char* stdCode, WTSKlinePeriod period, uint32_t times, uint64_t stime, uint64_t etime, double progress);

	uint32_t	locate_barindex(const std::string& key, uint64_t curTime, bool bUpperBound = false);

	void	run_by_bars(bool bNeedDump = false);
	void	run_by_tasks(bool bNeedDump = false);
	void	run_by_ticks(bool bNeedDump = false);

	bool init(WTSVariant* cfg, EventNotifier* notifier = NULL, IBtDataLoader* dataLoader = NULL);
	bool prepare();
	void run(bool bNeedDump = false);
	void stop();
	void clear_cache();

	// 设置数据回放时间
	inline void set_time_range(uint64_t stime, uint64_t etime)
		_begin_time = stime;
		_end_time = etime;

	// 设置是否允许tick
	inline void enable_tick(bool bEnabled = true)
		_tick_enabled = bEnabled;

	// 注册回调函数和策略名称
	inline void register_sink(IDataSink* listener, const char* sinkName) 
		_listener = listener; 
		_stra_name = sinkName;

	void register_task(uint32_t taskid, uint32_t date, uint32_t time, const char* period, const char* trdtpl = "CHINA", const char* session = "TRADING");

	WTSKlineSlice* get_kline_slice(const char* stdCode, const char* period, uint32_t count, uint32_t times = 1, bool isMain = false);

	WTSTickSlice* get_tick_slice(const char* stdCode, uint32_t count, uint64_t etime = 0);

	WTSOrdDtlSlice* get_order_detail_slice(const char* stdCode, uint32_t count, uint64_t etime = 0);

	WTSOrdQueSlice* get_order_queue_slice(const char* stdCode, uint32_t count, uint64_t etime = 0);

	WTSTransSlice* get_transaction_slice(const char* stdCode, uint32_t count, uint64_t etime = 0);

	WTSTickData* get_last_tick(const char* stdCode);

	uint32_t get_date() const{ return _cur_date; }
	uint32_t get_min_time() const{ return _cur_time; }
	uint32_t get_raw_time() const{ return _cur_time; }
	uint32_t get_secs() const{ return _cur_secs; }
	uint32_t get_trading_date() const{ return _cur_tdate; }

	double calc_fee(const char* stdCode, double price, double qty, uint32_t offset);
	WTSSessionInfo*		get_session_info(const char* sid, bool isCode = false);
	WTSCommodityInfo*	get_commodity_info(const char* stdCode);
	double get_cur_price(const char* stdCode);

	void sub_tick(uint32_t sid, const char* stdCode);
	void sub_order_queue(uint32_t sid, const char* stdCode);
	void sub_order_detail(uint32_t sid, const char* stdCode);
	void sub_transaction(uint32_t sid, const char* stdCode);

	inline bool	is_tick_enabled() const{ return _tick_enabled; }

	inline bool	is_tick_simulated() const { return _tick_simulated; }

	inline void update_price(const char* stdCode, double price)
		_price_map[stdCode] = price;

	IDataSink*		_listener;			// 数据回放回调函数接口
	IBtDataLoader*	_bt_loader;			// 数据加载器接口
	std::string		_stra_name;			// 策略名称
	EventNotifier*	_notifier;			// 消息通知

	std::string		_main_key;
	std::string		_min_period;		//最小K线周期,这个主要用于未订阅品种的信号处理上
	bool			_tick_enabled;		//是否开启了tick回测
	bool			_tick_simulated;	//是否需要模拟tick
	std::map<std::string, WTSTickStruct>	_day_cache;		//每日Tick缓存,当tick回放未开放时,会用到该缓存
	std::map<std::string, std::string>		_ticker_keys;

	uint32_t		_cur_date;
	uint32_t		_cur_time;
	uint32_t		_cur_secs;
	uint32_t		_cur_tdate;
	uint32_t		_closed_tdate;
	uint32_t		_opened_tdate;

	WTSBaseDataMgr	_bd_mgr;			// 基础数据管理器接口
	WTSHotMgr		_hot_mgr;			// 主力合约管理器接口

	std::string		_base_dir;			// 数据文件路径
	std::string		_mode;				// 数据模式
	uint64_t		_begin_time;		// 回测开始时间
	uint64_t		_end_time;			// 回测结束时间

	bool			_running;			// 是否还在运行中
	bool			_terminated;		// 是否终端停止运行

	typedef struct _FeeItem
		double	_open;
		double	_close;
		double	_close_today;
		bool	_by_volume;

			memset(this, 0, sizeof(_FeeItem));
	} FeeItem;
	typedef faster_hashmap<std::string, FeeItem> FeeMap;
	FeeMap _fee_map;

	typedef faster_hashmap<std::string, double> PriceMap;
	PriceMap _price_map;

	// 除权因子列表
	typedef struct _AdjFactor
		uint32_t	_date;
		double		_factor;
	} AdjFactor;
	typedef std::vector<AdjFactor> AdjFactorList;
	typedef faster_hashmap<std::string, AdjFactorList>	AdjFactorMap;
	AdjFactorMap	_adj_factors;

	const AdjFactorList& getAdjFactors(const char* code, const char* exchg, const char* pid);


run_by_…逻辑略复杂, 以后再研究吧

	: _listener(NULL)
	, _cur_date(0)
	, _cur_time(0)
	, _cur_secs(0)
	, _cur_tdate(0)
	, _tick_enabled(true)
	, _opened_tdate(0)
	, _closed_tdate(0)
	, _tick_simulated(true)
	, _running(false)
	, _begin_time(0)
	, _end_time(0)
	, _bt_loader(NULL)


// 利用 config.json 文件初始化对象, 同时加载其他 json 文件
bool HisDataReplayer::init(WTSVariant* cfg, EventNotifier* notifier /* = NULL */, IBtDataLoader* dataLoader /* = NULL */)
	// 添加事件通知和数据加载对象指针
	_notifier = notifier;
	_bt_loader = dataLoader;

	_mode = cfg->getCString("mode");
	_base_dir = StrUtil::standardisePath(cfg->getCString("path"));
	bool isRangeCfg = (_begin_time == 0 || _end_time == 0);//是否从配置文件读取回测区间
	if(_begin_time == 0)
		_begin_time = cfg->getUInt64("stime");

	if(_end_time == 0)
		_end_time = cfg->getUInt64("etime");

	WTSLogger::info(fmt::format("Backtest time range is set to be [{},{}] via config", _begin_time, _end_time).c_str());

	_tick_enabled = cfg->getBoolean("tick");
	WTSLogger::info("Tick data replaying is %s", _tick_enabled ? "enabled" : "disabled");

	WTSVariant* cfgBF = cfg->get("basefiles");
	if (cfgBF->get("session"))

	WTSVariant* cfgItem = cfgBF->get("commodity");
	if (cfgItem)
		if (cfgItem->type() == WTSVariant::VT_String)
		else if (cfgItem->type() == WTSVariant::VT_Array)
			for(uint32_t i = 0; i < cfgItem->size(); i ++)

	cfgItem = cfgBF->get("contract");
	if (cfgItem)
		if (cfgItem->type() == WTSVariant::VT_String)
		else if (cfgItem->type() == WTSVariant::VT_Array)
			for (uint32_t i = 0; i < cfgItem->size(); i++)

	if (cfgBF->get("holiday"))

	if (cfgBF->get("hot"))

	if (cfgBF->get("second"))

	// 加载手续费模板

	// 先从_bt_loader加载除权因子, 如果加载失败, 并且配置了除权因子文件, 再加载除权因子文件
	bool bLoaded = loadStkAdjFactorsFromLoader();
	if (!bLoaded && cfg->has("adjfactor"))

	return true;

// 从 _bt_loader 加载除权数据
bool HisDataReplayer::loadStkAdjFactorsFromLoader()
	if (NULL == _bt_loader)
		return false;

	bool ret = _bt_loader->loadAllAdjFactors(this, [](void* obj, const char* stdCode, uint32_t* dates, double* factors, uint32_t count) {
		HisDataReplayer* replayer = (HisDataReplayer*)obj;
		AdjFactorList& fctrLst = replayer->_adj_factors[stdCode];

		for (uint32_t i = 0; i < count; i++)
			AdjFactor adjFact;
			adjFact._date = dates[i];
			adjFact._factor = factors[i];

		// 一定要把第一条加进去,不然如果是前复权的话,可能会漏处理最早的数据
		AdjFactor adjFact;
		adjFact._date = 19900101;
		adjFact._factor = 1;

		std::sort(fctrLst.begin(), fctrLst.end(), [](const AdjFactor& left, const AdjFactor& right) {
			return left._date < right._date;

	if (ret) WTSLogger::info("Adjusting factors of %u contracts loaded via extended loader", _adj_factors.size());
	return ret;

// 从文件中加载除权因子
bool HisDataReplayer::loadStkAdjFactorsFromFile(const char* adjfile)
	if (!boost::filesystem::exists(adjfile))
		WTSLogger::error("Adjust factor file %s not exists", adjfile);
		return false;

	std::string content;
	StdFile::read_file_content(adjfile, content);

	rj::Document doc;

	if (doc.HasParseError())
		WTSLogger::error("Parsing adjust factor file %s faield", adjfile);
		return false;

	uint32_t stk_cnt = 0;
	uint32_t fct_cnt = 0;
	for (auto& mExchg : doc.GetObject())
		const char* exchg = mExchg.name.GetString();
		const rj::Value& itemExchg = mExchg.value;
		for (auto& mCode : itemExchg.GetObject())
			std::string code = mCode.name.GetString();
			const rj::Value& ayFacts = mCode.value;
			if (!ayFacts.IsArray())

			 *	By Wesley @ 2021.12.21
			 *	先检查code的格式是不是包含PID,如STK.600000
			 *	如果包含PID,则直接格式化,如果不包含,则强制为STK
			bool bHasPID = (code.find('.') != std::string::npos);

			std::string key;
				key = StrUtil::printf("%s.%s", exchg, code.c_str());
				key = StrUtil::printf("%s.STK.s", exchg, code.c_str());

			AdjFactorList& fctrLst = _adj_factors[key];
			for (auto& fItem : ayFacts.GetArray())
				AdjFactor adjFact;
				adjFact._date = fItem["date"].GetUint();
				adjFact._factor = fItem["factor"].GetDouble();


			AdjFactor adjFact;
			adjFact._date = 19900101;
			adjFact._factor = 1;

			std::sort(fctrLst.begin(), fctrLst.end(), [](const AdjFactor& left, const AdjFactor& right) {
				return left._date < right._date;

	WTSLogger::info("%u items of adjust factors for %u stocks loaded from %s", fct_cnt, stk_cnt, adjfile);
	return true;

// 注册任务
void HisDataReplayer::register_task(uint32_t taskid, uint32_t date, uint32_t time, const char* period, const char* trdtpl /* = "CHINA" */, const char* session /* = "TRADING" */)
	TaskPeriodType ptype;
	if (wt_stricmp(period, "d") == 0)
		ptype = TPT_Daily;
	else if (wt_stricmp(period, "w") == 0)
		ptype = TPT_Weekly;
	else if (wt_stricmp(period, "m") == 0)
		ptype = TPT_Monthly;
	else if (wt_stricmp(period, "y") == 0)
		ptype = TPT_Yearly;
	else if (wt_stricmp(period, "min") == 0)
		ptype = TPT_Minute;
		ptype = TPT_None;

	_task.reset(new TaskInfo);
	strcpy(_task->_name, "sel");
	strcpy(_task->_trdtpl, trdtpl);
	strcpy(_task->_session, session);
	_task->_day = date;
	_task->_time = time;
	_task->_id = taskid;
	_task->_period = ptype;
	_task->_strict_time = true;

	WTSLogger::info("Timed task registration succeed, frequency: %s", period);

// 清除数据缓存
void HisDataReplayer::clear_cache()

	WTSLogger::warn("All cached data cleared");

// 重置不会清除掉缓存,而是将读取的标记还原,这样不用重复加载主句
void HisDataReplayer::reset()
	for(auto& m : _ticks_cache)
		HftDataList<WTSTickStruct>& cacheItem = (HftDataList<WTSTickStruct>&)m.second;
		cacheItem._cursor = UINT_MAX;

	for (auto& m : _orddtl_cache)
		HftDataList<WTSOrdDtlStruct>& cacheItem = (HftDataList<WTSOrdDtlStruct>&)m.second;
		cacheItem._cursor = UINT_MAX;

	for (auto& m : _ordque_cache)
		HftDataList<WTSOrdQueStruct>& cacheItem = (HftDataList<WTSOrdQueStruct>&)m.second;
		cacheItem._cursor = UINT_MAX;

	for (auto& m : _trans_cache)
		HftDataList<WTSTransStruct>& cacheItem = (HftDataList<WTSTransStruct>&)m.second;
		cacheItem._cursor = UINT_MAX;

	for (auto& m : _bars_cache)
		BarsList& cacheItem = (BarsList&)m.second;
		cacheItem._cursor = UINT_MAX;

		WTSLogger::info("Reading flag of %s has been reset", m.first.c_str());

	// 以下变量在数据回放过程中都会被更新

	_main_key = "";
	_min_period = "";

	_cur_date = 0;
	_cur_time = 0;
	_cur_secs = 0;
	_cur_tdate = 0;
	_opened_tdate = 0;
	_closed_tdate = 0;
	_tick_simulated = true;

// 状态数据保存
void HisDataReplayer::dump_btstate(const char* stdCode, WTSKlinePeriod period, uint32_t times, uint64_t stime, uint64_t etime, double progress, int64_t elapse)
	std::string output;
		rj::Document root(rj::kObjectType);
		rj::Document::AllocatorType &allocator = root.GetAllocator();

		root.AddMember("code", rj::Value(stdCode, allocator), allocator);

		std::stringstream ss;
		if (period == KP_DAY)
			ss << "d";
		else if (period == KP_Minute1)
			ss << "m" << times;
			ss << "m" << times * 5;

		root.AddMember("period", rj::Value(ss.str().c_str(), allocator), allocator);
		root.AddMember("stime", stime, allocator);
		root.AddMember("etime", etime, allocator);
		root.AddMember("progress", progress, allocator);
		root.AddMember("elapse", elapse, allocator);

		rj::StringBuffer sb;
		rj::PrettyWriter<rj::StringBuffer> writer(sb);

		output = sb.GetString();

	std::string folder = WtHelper::getOutputDir();
	folder += _stra_name;
	folder += "/";
	std::string filename = folder + "btenv.json";
	StdFile::write_file_content(filename.c_str(), output.c_str(), output.size());

// 状态通知
void HisDataReplayer::notify_state(const char* stdCode, WTSKlinePeriod period, uint32_t times, uint64_t stime, uint64_t etime, double progress)
	if (!_notifier)

	std::string output;
		rj::Document root(rj::kObjectType);
		rj::Document::AllocatorType &allocator = root.GetAllocator();

		root.AddMember("code", rj::Value(stdCode, allocator), allocator);

		std::stringstream ss;
		if (period == KP_DAY)
			ss << "d";
		else if (period == KP_Minute1)
			ss << "m" << times;
			ss << "m" << times * 5;

		root.AddMember("period", rj::Value(ss.str().c_str(), allocator), allocator);
		root.AddMember("stime", stime, allocator);
		root.AddMember("etime", etime, allocator);
		root.AddMember("progress", progress, allocator);

		rj::StringBuffer sb;
		rj::PrettyWriter<rj::StringBuffer> writer(sb);

		output = sb.GetString();
	_notifier->notifyData("BT_STATE", (void*)output.c_str(), output.size());

// 定位到key对应BarsList中, now(时间)的那根K线位置
uint32_t HisDataReplayer::locate_barindex(const std::string& key, uint64_t now, bool bUpperBound /* = false */)
	uint32_t curDate, curTime;					// 当前日期和时间
	curDate = (uint32_t)(now / 10000);
	curTime = (uint32_t)(now % 10000);

	BarsList& barsList = _bars_cache[key];		// 
	bool isDay = (barsList._period == KP_DAY);

	WTSBarStruct bar;							// 
	bar.date = curDate;							// 
	bar.time = (curDate - 19900000) * 10000 + curTime;

	// std::lower_bound() 是在区间内找到第一个大于等于 value 的值的位置并返回,如果没找到就返回 end() 位置。而 std::upper_bound() 是找到第一个大于 value 值的位置并返回,如果找不到同样返回 end() 位置

	// 定位到 curDate, curTime 位置的那根bar
	auto it = std::lower_bound(barsList._bars.begin(), barsList._bars.end(), bar, [isDay](const WTSBarStruct& a, const WTSBarStruct& b) {
		if (isDay)
			return a.date < b.date;
			return a.time < b.time;

	uint32_t idx;
	if (it == barsList._bars.end())
		idx = barsList._bars.size() - 1;
			if ((isDay && it->date > bar.date) || (!isDay && it->time > bar.time))
		idx = it - barsList._bars.begin();
	return idx;

// 停止数据回放
void HisDataReplayer::stop()
		WTSLogger::error("Backtesting is not running, no need to stop");

	if (_terminated)

	_terminated = true;
	WTSLogger::warn("Terminating flag reset to true, backtesting will quit at next round");

// 回放准备
bool HisDataReplayer::prepare()
	// 不能同时运行多个回测任务
	if (_running)
		WTSLogger::error("Cannot run more than one backtesting task at the same time");
		return false;

	_running = true;
	_terminated = false;
	// 将所有变量和数据缓存初始化

	_cur_date = (uint32_t)(_begin_time / 10000);
	_cur_time = (uint32_t)(_begin_time % 10000);
	_cur_secs = 0;

	// 为何传入的是 "TRADING"? 因为配置文件有 sessions.json
	_cur_tdate = _bd_mgr.calcTradingDate(DEFAULT_SESSIONID, _cur_date, _cur_time, true);

	if (_notifier)

	// 调用IDataSink接口(在策略模板中实现)中的回调函数

	if (!_tick_enabled)

	return true;

// K线回放主逻辑
void HisDataReplayer::run(bool bNeedDump/* = false*/)
	if(_task == NULL)

		if (_main_key.empty() && !_bars_cache.empty())
			WTSKlinePeriod minPeriod = KP_DAY;
			uint32_t minTimes = 1;
			for(auto& m : _bars_cache)
				const BarsList& barList = m.second;
				if (barList._period < minPeriod)
					minPeriod = barList._period;
					minTimes = barList._times;
					_main_key = m.first;
				else if(barList._period == minPeriod)
					if(barList._times < minTimes)
						_main_key = m.first;
						minTimes = barList._times;

			WTSLogger::info("Main K bars automatic determined: %s", _main_key.c_str());

		else if(_tick_enabled)
			WTSLogger::info("Main K bars not subscribed and backtesting of tick data not available , replaying done");
			if (_notifier)
	else //if(_task != NULL)

	_running = false;

void HisDataReplayer::run_by_ticks(bool bNeedDump /* = false */)
	uint32_t edt = (uint32_t)(_end_time / 10000);
	uint32_t etime = (uint32_t)(_end_time % 10000);
	uint64_t end_tdate = _bd_mgr.calcTradingDate(DEFAULT_SESSIONID, edt, etime, true);

	while (_cur_tdate <= end_tdate && !_terminated)
		if (checkAllTicks(_cur_tdate))
			WTSLogger::info("Start to replay tick data of %u...", _cur_tdate);

		_cur_tdate = TimeUtils::getNextDate(_cur_tdate);

	if (_terminated)
		WTSLogger::debug("Replaying by ticks terminated forcely");

	WTSLogger::info("All back data replayed, replaying done");
	if (_notifier)

void HisDataReplayer::run_by_bars(bool bNeedDump /* = false */)
	int64_t now = TimeUtils::getLocalTimeNano();

	BarsList& barList = _bars_cache[_main_key];
	WTSSessionInfo* sInfo = get_session_info(barList._code.c_str(), true);
	std::string commId = CodeHelper::stdCodeToStdCommID(barList._code.c_str());

	uint32_t sIdx = locate_barindex(_main_key, _begin_time, false);
	uint32_t eIdx = locate_barindex(_main_key, _end_time, true);

	uint32_t total_barcnt = eIdx - sIdx + 1;
	uint32_t replayed_barcnt = 0;

	notify_state(barList._code.c_str(), barList._period, barList._times, _begin_time, _end_time, 0);

	if (bNeedDump)
		dump_btstate(barList._code.c_str(), barList._period, barList._times, _begin_time, _end_time, 100.0, TimeUtils::getLocalTimeNano() - now);

	WTSLogger::log_raw(LL_INFO, fmt::format("Start to replay back data from {}...", _begin_time).c_str());

	for (; !_terminated;)
		bool isDay = barList._period == KP_DAY;
		if (barList._cursor != UINT_MAX)
			uint64_t nextBarTime = 0;
			if (isDay)
				nextBarTime = (uint64_t)barList._bars[barList._cursor].date * 10000 + sInfo->getCloseTime();
				nextBarTime = (uint64_t)barList._bars[barList._cursor].time;
				nextBarTime += 199000000000;

			if (nextBarTime > _end_time)
				WTSLogger::log_raw(LL_INFO, fmt::format("{} is beyond ending time {},replaying done", nextBarTime, _end_time).c_str());

			uint32_t nextDate = (uint32_t)(nextBarTime / 10000);
			uint32_t nextTime = (uint32_t)(nextBarTime % 10000);

			uint32_t nextTDate = _bd_mgr.calcTradingDate(commId.c_str(), nextDate, nextTime, false);
			if (_opened_tdate != nextTDate)
				_opened_tdate = nextTDate;
				_cur_tdate = nextTDate;

			uint64_t curBarTime = (uint64_t)_cur_date * 10000 + _cur_time;
			if (_tick_enabled)
				_tick_simulated = !replayHftDatas(curBarTime, nextBarTime);

			_cur_date = nextDate;
			_cur_time = nextTime;
			_cur_secs = 0;

			uint32_t offTime = sInfo->offsetTime(_cur_time);
			bool isEndTDate = (offTime >= sInfo->getCloseTime(true));

			if (!_tick_enabled)
				replayUnbars(curBarTime, nextBarTime, (isDay || isEndTDate) ? nextTDate : 0);

			onMinuteEnd(nextDate, nextTime, (isDay || isEndTDate) ? nextTDate : 0, _tick_simulated);

			replayed_barcnt += 1;

			if (isEndTDate && _closed_tdate != _cur_tdate)
				_closed_tdate = _cur_tdate;

			notify_state(barList._code.c_str(), barList._period, barList._times, _begin_time, _end_time, replayed_barcnt*100.0 / total_barcnt);

			if (barList._cursor >= barList._bars.size())
				WTSLogger::info("All back data replayed, replaying done");
			WTSLogger::error("No back data initialized, replaying canceled");

	if (_terminated)
		WTSLogger::debug("Replaying by bars terminated forcely");

	notify_state(barList._code.c_str(), barList._period, barList._times, _begin_time, _end_time, 100);
	if (_notifier)

	if (_closed_tdate != _cur_tdate)

	if (bNeedDump)
		dump_btstate(barList._code.c_str(), barList._period, barList._times, _begin_time, _end_time, 100.0, TimeUtils::getLocalTimeNano() - now);


void HisDataReplayer::run_by_tasks(bool bNeedDump /* = false */)
	WTSSessionInfo* sInfo = NULL;
	const char* DEF_SESS = (strlen(_task->_session) == 0) ? DEFAULT_SESSIONID : _task->_session;
	sInfo = _bd_mgr.getSession(DEF_SESS);
	WTSLogger::log_raw(LL_INFO, fmt::format("Start to backtest with task frequency from {}...", _begin_time).c_str());

	if (_task->_period != TPT_Minute)
		uint32_t endtime = TimeUtils::getNextMinute(_task->_time, -1);
		bool bIsPreDay = endtime > _task->_time;
		if (bIsPreDay)
			_cur_date = TimeUtils::getNextDate(_cur_date, -1);

		for (; !_terminated;)
			bool fired = false;
			uint32_t preTDate = TimeUtils::getNextDate(_cur_tdate, -1);
			if (_cur_time == endtime)
				if (!_bd_mgr.isHoliday(_task->_trdtpl, _cur_date, true))
					uint32_t weekDay = TimeUtils::getWeekDay(_cur_date);

					bool bHasHoliday = false;
					uint32_t days = 1;
					while (_bd_mgr.isHoliday(_task->_trdtpl, preTDate, true))
						bHasHoliday = true;
						preTDate = TimeUtils::getNextDate(preTDate, -1);
					uint32_t preWD = TimeUtils::getWeekDay(preTDate);

					switch (_task->_period)
					case TPT_Daily:
						fired = true;
					case TPT_Minute:
					case TPT_Monthly:
						//if (preTDate % 1000000 < _task->_day && _cur_date % 1000000 >= _task->_day)
						//	fired = true;
						if (_cur_date % 1000000 == _task->_day)
							fired = true;
						else if (bHasHoliday)
							if ((preTDate % 10000 / 100 < _cur_date % 10000 / 100) && _cur_date % 1000000 > _task->_day)
								fired = true;
							else if (preTDate % 1000000 < _task->_day && _cur_date % 1000000 > _task->_day)
								fired = true;
					case TPT_Weekly:
						//if (preWD < _task->_day && weekDay >= _task->_day)
						//	fired = true;
						if (weekDay == _task->_day)
							fired = true;
						else if (bHasHoliday)
							if (days >= 7 && weekDay > _task->_day)
								fired = true;
							else if (preWD > weekDay && weekDay > _task->_day)
								fired = true;
							else if (preWD < _task->_day && weekDay > _task->_day)
								fired = true;
					case TPT_Yearly:
						if (preTDate % 10000 < _task->_day && _cur_date % 10000 >= _task->_day)
							fired = true;

			if (!fired)
				if (_cur_time < endtime)
					_cur_time = endtime;

				uint32_t newTDate = _bd_mgr.calcTradingDate(DEF_SESS, _cur_date, _cur_time, true);

				if (newTDate != _cur_tdate)
					_cur_tdate = newTDate;
					if (_listener)
					if (_listener)
				uint32_t curDate = _cur_date;
				uint32_t curTime = endtime;
				bool bEndSession = sInfo->offsetTime(curTime) >= sInfo->getCloseTime(true);
				if (_listener)
				onMinuteEnd(curDate, curTime, bEndSession ? _cur_tdate : preTDate);
				if (_listener)

			_cur_date = TimeUtils::getNextDate(_cur_date);
			_cur_time = endtime;
			_cur_tdate = _bd_mgr.calcTradingDate(DEF_SESS, _cur_date, _cur_time, true);

			uint64_t nextTime = (uint64_t)_cur_date * 10000 + _cur_time;
			if (nextTime > _end_time)
				WTSLogger::info("Backtesting with task frequency is done");
				if (_listener)
					if (_notifier)

		if (_listener)

		for (; !_terminated;)
			uint32_t mins = sInfo->timeToMinutes(_cur_time);
			if (mins % _task->_time != 0)
				mins = mins / _task->_time + _task->_time;
				_cur_time = sInfo->minuteToTime(mins);

			bool bNewTDate = false;
			if (mins < sInfo->getTradingMins())
				onMinuteEnd(_cur_date, _cur_time, 0);
				bNewTDate = true;
				mins = sInfo->getTradingMins();
				_cur_time = sInfo->getCloseTime();

				onMinuteEnd(_cur_date, _cur_time, _cur_tdate);

				if (_listener)

			if (bNewTDate)
				mins = _task->_time;
				uint32_t nextTDate = _bd_mgr.getNextTDate(_task->_trdtpl, _cur_tdate, 1, true);

				if (sInfo->getOffsetMins() != 0)
					if (sInfo->getOffsetMins() > 0)
						_cur_date = _cur_tdate;
						_cur_tdate = nextTDate;
						_cur_tdate = nextTDate;
						_cur_date = _cur_tdate;

				_cur_time = sInfo->minuteToTime(mins);

				if (_listener)
				mins += _task->_time;
				uint32_t newTime = sInfo->minuteToTime(mins);
				bool bNewDay = newTime < _cur_time;
				if (bNewDay)
					_cur_date = TimeUtils::getNextDate(_cur_date);

				uint32_t dayMins = _cur_time / 100 * 60 + _cur_time % 100;
				uint32_t nextDMins = newTime / 100 * 60 + newTime % 100;

				bool bNewSec = (nextDMins - dayMins > _task->_time) && !bNewDay;

				while (bNewSec && _bd_mgr.isHoliday(_task->_trdtpl, _cur_date, true))
					_cur_date = TimeUtils::getNextDate(_cur_date);

				_cur_time = newTime;

			uint64_t nextTime = (uint64_t)_cur_date * 10000 + _cur_time;
			if (nextTime > _end_time)
				WTSLogger::info("Backtesting with task frequency is done");
				if (_listener)
					if (_notifier)

void HisDataReplayer::replayUnbars(uint64_t stime, uint64_t nowTime, uint32_t endTDate /* = 0 */)
	//uint64_t nowTime = (uint64_t)uDate * 10000 + uTime;
	uint32_t uDate = (uint32_t)(stime / 10000);

	for (auto& item : _unbars_cache)
		BarsList& barsList = (BarsList&)item.second;
		if (barsList._period != KP_DAY)
			//如果历史数据指标不在尾部, 说明是回测模式, 要继续回放历史数据
			if (barsList._bars.size() > barsList._cursor)
				for (;;)
					WTSBarStruct& nextBar = barsList._bars[barsList._cursor];

					uint64_t barTime = 199000000000 + nextBar.time;
					if (barTime <= nowTime)
						if(barTime <= stime)

						WTSTickStruct& curTS = _day_cache[barsList._code];
						strcpy(curTS.code, barsList._code.c_str());
						curTS.action_date = _cur_date;
						curTS.action_time = _cur_time * 100000;

						curTS.price = nextBar.open;
						curTS.volume = nextBar.vol;

						if (decimal::eq(curTS.open, 0))
							curTS.open = curTS.price;
						curTS.high = max(curTS.price, curTS.high);
						if (decimal::eq(curTS.low, 0))
							curTS.low = curTS.price;
							curTS.low = min(curTS.price, curTS.low);

						WTSTickData* curTick = WTSTickData::create(curTS);
						_listener->handle_tick(barsList._code.c_str(), curTick);

						curTS.price = nextBar.high;
						curTS.volume = nextBar.vol;
						curTS.high = max(curTS.price, curTS.high);
						curTS.low = min(curTS.price, curTS.low);
						curTick = WTSTickData::create(curTS);
						_listener->handle_tick(barsList._code.c_str(), curTick);

						curTS.price = nextBar.low;
						curTS.high = max(curTS.price, curTS.high);
						curTS.low = min(curTS.price, curTS.low);
						curTick = WTSTickData::create(curTS);
						_listener->handle_tick(barsList._code.c_str(), curTick);

						curTS.price = nextBar.close;
						curTS.high = max(curTS.price, curTS.high);
						curTS.low = min(curTS.price, curTS.low);
						curTick = WTSTickData::create(curTS);
						_listener->handle_tick(barsList._code.c_str(), curTick);


					if (barsList._cursor == barsList._bars.size())
			if (barsList._bars.size() > barsList._cursor)
				for (;;)
					WTSBarStruct& nextBar = barsList._bars[barsList._cursor];

					if (nextBar.date <= endTDate)
						if (nextBar.date <= uDate)

						CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(barsList._code.c_str());

						std::string realCode = barsList._code;
						if (cInfo.isStock() && cInfo.isExright())
							realCode = cInfo.pureStdCode();

						WTSSessionInfo* sInfo = get_session_info(realCode.c_str(), true);
						uint32_t curTime = sInfo->getOpenTime();
						WTSTickStruct curTS;
						strcpy(curTS.code, realCode.c_str());
						curTS.action_date = _cur_date;
						curTS.action_time = curTime * 100000;

						curTS.price = nextBar.open;
						curTS.volume = nextBar.vol;
						WTSTickData* curTick = WTSTickData::create(curTS);
						_listener->handle_tick(realCode.c_str(), curTick);

						curTS.price = nextBar.high;
						curTS.volume = nextBar.vol;
						curTick = WTSTickData::create(curTS);
						_listener->handle_tick(realCode.c_str(), curTick);

						curTS.price = nextBar.low;
						curTick = WTSTickData::create(curTS);
						_listener->handle_tick(realCode.c_str(), curTick);

						curTS.price = nextBar.close;
						curTick = WTSTickData::create(curTS);
						_listener->handle_tick(realCode.c_str(), curTick);


					if (barsList._cursor >= barsList._bars.size())

uint64_t HisDataReplayer::getNextTickTime(uint32_t curTDate, uint64_t stime /* = UINT64_MAX */)
	uint64_t nextTime = UINT64_MAX;
	for (auto& v : _tick_sub_map)
		const char* stdCode = v.first.c_str();
		if (!checkTicks(stdCode, curTDate))

		auto& tickList = _ticks_cache[stdCode];
		if (tickList._cursor == UINT_MAX)
			if (stime == UINT64_MAX)
				tickList._cursor = 1;
				uint32_t uDate = (uint32_t)(stime / 10000);
				uint32_t uTime = (uint32_t)(stime % 10000);

				WTSTickStruct curTick;
				curTick.action_date = uDate;
				curTick.action_time = uTime * 100000;

				auto tit = std::lower_bound(tickList._items.begin(), tickList._items.end(), curTick, [](const WTSTickStruct& a, const WTSTickStruct& b) {
					if (a.action_date != b.action_date)
						return a.action_date < b.action_date;
						return a.action_time < b.action_time;

				uint32_t idx = tit - tickList._items.begin();
				tickList._cursor = idx + 1;

		if (tickList._cursor >= tickList._count)

		const WTSTickStruct& nextTick = tickList._items[tickList._cursor - 1];
		uint64_t lastTime = (uint64_t)nextTick.action_date * 1000000000 + nextTick.action_time;

		nextTime = min(lastTime, nextTime);

	return nextTime;

uint64_t HisDataReplayer::getNextTransTime(uint32_t curTDate, uint64_t stime /* = UINT64_MAX */)
	uint64_t nextTime = UINT64_MAX;
	for (auto v : _trans_sub_map)
		std::string stdCode = v.first;
		if (!checkTransactions(stdCode.c_str(), curTDate))

		auto& itemList = _trans_cache[stdCode];
		if (itemList._cursor == UINT_MAX)
			if (stime == UINT64_MAX)
				itemList._cursor = 1;
				uint32_t uDate = (uint32_t)(stime / 10000);
				uint32_t uTime = (uint32_t)(stime % 10000);

				WTSTransStruct curItem;
				curItem.action_date = uDate;
				curItem.action_time = uTime * 100000;

				auto tit = std::lower_bound(itemList._items.begin(), itemList._items.end(), curItem, [](const WTSTransStruct& a, const WTSTransStruct& b) {
					if (a.action_date != b.action_date)
						return a.action_date < b.action_date;
						return a.action_time < b.action_time;

				uint32_t idx = tit - itemList._items.begin();
				itemList._cursor = idx + 1;

		if (itemList._cursor >= itemList._count)

		const auto& nextItem = itemList._items[itemList._cursor - 1];
		uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;

		nextTime = min(lastTime, nextTime);

	return nextTime;

uint64_t HisDataReplayer::getNextOrdDtlTime(uint32_t curTDate, uint64_t stime /* = UINT64_MAX */)
	uint64_t nextTime = UINT64_MAX;
	for (auto v : _orddtl_sub_map)
		std::string stdCode = v.first;
		if (!checkOrderDetails(stdCode.c_str(), curTDate))

		auto& itemList = _orddtl_cache[stdCode];
		if (itemList._cursor == UINT_MAX)
			if (stime == UINT64_MAX)
				itemList._cursor = 1;
				uint32_t uDate = (uint32_t)(stime / 10000);
				uint32_t uTime = (uint32_t)(stime % 10000);

				WTSOrdDtlStruct curItem;
				curItem.action_date = uDate;
				curItem.action_time = uTime * 100000;

				auto tit = std::lower_bound(itemList._items.begin(), itemList._items.end(), curItem, [](const WTSOrdDtlStruct& a, const WTSOrdDtlStruct& b) {
					if (a.action_date != b.action_date)
						return a.action_date < b.action_date;
						return a.action_time < b.action_time;

				uint32_t idx = tit - itemList._items.begin();
				itemList._cursor = idx + 1;

		if (itemList._cursor >= itemList._count)

		const auto& nextItem = itemList._items[itemList._cursor - 1];
		uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;

		nextTime = min(lastTime, nextTime);

	return nextTime;

uint64_t HisDataReplayer::getNextOrdQueTime(uint32_t curTDate, uint64_t stime /* = UINT64_MAX */)
	uint64_t nextTime = UINT64_MAX;
	for (auto v : _ordque_sub_map)
		std::string stdCode = v.first;
		if (!checkOrderQueues(stdCode.c_str(), curTDate))

		auto& itemList = _ordque_cache[stdCode];
		if (itemList._cursor == UINT_MAX)
			if (stime == UINT64_MAX)
				itemList._cursor = 1;
				uint32_t uDate = (uint32_t)(stime / 10000);
				uint32_t uTime = (uint32_t)(stime % 10000);

				WTSOrdQueStruct curItem;
				curItem.action_date = uDate;
				curItem.action_time = uTime * 100000;

				auto tit = std::lower_bound(itemList._items.begin(), itemList._items.end(), curItem, [](const WTSOrdQueStruct& a, const WTSOrdQueStruct& b) {
					if (a.action_date != b.action_date)
						return a.action_date < b.action_date;
						return a.action_time < b.action_time;

				uint32_t idx = tit - itemList._items.begin();
				itemList._cursor = idx + 1;

		if (itemList._cursor >= itemList._count)

		const auto& nextItem = itemList._items[itemList._cursor - 1];
		uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;

		nextTime = min(lastTime, nextTime);

	return nextTime;

uint64_t HisDataReplayer::replayHftDatasByDay(uint32_t curTDate)
	uint64_t total_ticks = 0;
	for (;!_terminated;)
		uint64_t nextTime = min(UINT64_MAX, getNextTickTime(curTDate));
		nextTime = min(nextTime, getNextOrdDtlTime(curTDate));
		nextTime = min(nextTime, getNextOrdQueTime(curTDate));
		nextTime = min(nextTime, getNextTransTime(curTDate));

		if(nextTime == UINT64_MAX)

		_cur_date = (uint32_t)(nextTime / 1000000000);
		_cur_time = nextTime % 1000000000 / 100000;
		_cur_secs = nextTime % 100000;

		for (auto& v : _orddtl_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _orddtl_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				WTSOrdDtlData* newData = WTSOrdDtlData::create(nextItem);
				_listener->handle_order_detail(stdCode, newData);


		for (auto& v : _trans_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _trans_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				WTSTransData* newData = WTSTransData::create(nextItem);
				_listener->handle_transaction(stdCode, newData);


		for (auto& v : _tick_sub_map)
			//std::string stdCode = v.first;
			const char* stdCode = v.first.c_str();
			auto& tickList = _ticks_cache[stdCode];
			WTSTickStruct& nextTick = tickList._items[tickList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextTick.action_date * 1000000000 + nextTick.action_time;
			if (lastTime <= nextTime)
				update_price(stdCode, nextTick.price);
				WTSTickData* newTick = WTSTickData::create(nextTick);
				_listener->handle_tick(stdCode, newTick);

		for (auto& v : _ordque_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _ordque_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				WTSOrdQueData* newData = WTSOrdQueData::create(nextItem);
				_listener->handle_order_queue(stdCode, newData);


	return total_ticks;

bool HisDataReplayer::replayHftDatas(uint64_t stime, uint64_t etime)
	for (;;)
		uint64_t nextTime = min(UINT64_MAX, getNextTickTime(_cur_tdate, stime));
		if (nextTime == UINT64_MAX)
			return false;

		nextTime = min(nextTime, getNextOrdDtlTime(_cur_tdate, stime));
		nextTime = min(nextTime, getNextOrdQueTime(_cur_tdate, stime));
		nextTime = min(nextTime, getNextTransTime(_cur_tdate, stime));

		if (nextTime/100000 >= etime)

		_cur_date = (uint32_t)(nextTime / 1000000000);
		_cur_time = nextTime % 1000000000 / 100000;
		_cur_secs = nextTime % 100000;
		for (auto& v : _orddtl_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _orddtl_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				WTSOrdDtlData* newData = WTSOrdDtlData::create(nextItem);
				_listener->handle_order_detail(stdCode, newData);


		for (auto& v : _trans_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _trans_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				WTSTransData* newData = WTSTransData::create(nextItem);
				_listener->handle_transaction(stdCode, newData);


		for (auto& v : _tick_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _ticks_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				update_price(stdCode, nextItem.price);
				WTSTickData* newData = WTSTickData::create(nextItem);
				_listener->handle_tick(stdCode, newData);


		for (auto& v : _ordque_sub_map)
			const char* stdCode = v.first.c_str();
			auto& itemList = _ordque_cache[stdCode];
			auto& nextItem = itemList._items[itemList._cursor - 1];
			uint64_t lastTime = (uint64_t)nextItem.action_date * 1000000000 + nextItem.action_time;
			if (lastTime <= nextTime)
				WTSOrdQueData* newData = WTSOrdQueData::create(nextItem);
				_listener->handle_order_queue(stdCode, newData);


	return true;

void HisDataReplayer::onMinuteEnd(uint32_t uDate, uint32_t uTime, uint32_t endTDate /* = 0 */, bool tickSimulated /* = true */)
	uint64_t nowTime = (uint64_t)uDate * 10000 + uTime;

	//if(_task && endTDate != 0)
	//	//如果是交易日结束,清理掉分钟线缓存,不然吃内存太多
	//	std::set<std::string> to_clear;
	//	for (auto it = _bars_cache.begin(); it != _bars_cache.end(); it++)
	//	{
	//		const BarsList& barsList = it->second;
	//		if (barsList._period != KP_DAY)
	//			to_clear.insert(it->first);
	//	}

	//	for(const std::string& key : to_clear)
	//	{
	//		auto it = _bars_cache.find(key);
	//		if (it != _bars_cache.end())
	//		{
	//			_bars_cache.erase(it);
	//			WTSLogger::info("Data cache %s cleared", key.c_str());
	//		}
	//	}

	for (auto it = _bars_cache.begin(); it != _bars_cache.end(); it++)
		BarsList& barsList = (BarsList&)it->second;
		double factor = barsList._factor;
		if (barsList._period != KP_DAY)
			//如果历史数据指标不在尾部, 说明是回测模式, 要继续回放历史数据
			if (barsList._bars.size() > barsList._cursor)
				for (;;)
					WTSBarStruct& nextBar = barsList._bars[barsList._cursor];

					uint64_t barTime = 199000000000 + nextBar.time;
					if (barTime <= nowTime)
						//if (_task == NULL)
							if (tickSimulated)
								const std::string& ticker = _ticker_keys[barsList._code];
								if (ticker == it->first)
									WTSTickStruct& curTS = _day_cache[barsList._code];
									strcpy(curTS.code, barsList._code.c_str());
									curTS.action_date = _cur_date;
									curTS.action_time = _cur_time * 100000;

									curTS.price = nextBar.open / factor;
									curTS.volume = nextBar.vol;

									if (decimal::eq(curTS.open, 0))
										curTS.open = curTS.price;
									curTS.high = max(curTS.price, curTS.high);
									if (decimal::eq(curTS.low, 0))
										curTS.low = curTS.price;
										curTS.low = min(curTS.price, curTS.low);

									update_price(barsList._code.c_str(), curTS.price);
									WTSTickData* curTick = WTSTickData::create(curTS);
									_listener->handle_tick(barsList._code.c_str(), curTick);

									curTS.price = nextBar.high / factor;
									curTS.volume = nextBar.vol;
									curTS.high = max(curTS.price, curTS.high);
									curTS.low = min(curTS.price, curTS.low);
									update_price(barsList._code.c_str(), curTS.price);
									curTick = WTSTickData::create(curTS);
									_listener->handle_tick(barsList._code.c_str(), curTick);

									curTS.price = nextBar.low / factor;
									curTS.high = max(curTS.price, curTS.high);
									curTS.low = min(curTS.price, curTS.low);
									update_price(barsList._code.c_str(), curTS.price);
									curTick = WTSTickData::create(curTS);
									_listener->handle_tick(barsList._code.c_str(), curTick);

									curTS.price = nextBar.close / factor;
									curTS.high = max(curTS.price, curTS.high);
									curTS.low = min(curTS.price, curTS.low);
									update_price(barsList._code.c_str(), curTS.price);
									curTick = WTSTickData::create(curTS);
									_listener->handle_tick(barsList._code.c_str(), curTick);

							uint32_t times = barsList._times;
							if (barsList._period == KP_Minute5)
								times *= 5;
							_listener->handle_bar_close(barsList._code.c_str(), "m", times, &nextBar);


					if (barsList._cursor == barsList._bars.size())
			if (barsList._bars.size() > barsList._cursor)
				for (;;)
					WTSBarStruct& nextBar = barsList._bars[barsList._cursor];

					if (nextBar.date <= endTDate)
						//if (_task == NULL)
							if (tickSimulated)
								const std::string& ticker = _ticker_keys[barsList._code];
								if (ticker == it->first)
									CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(barsList._code.c_str());

									std::string realCode = barsList._code;
									if (cInfo.isStock() && cInfo._exright)
										realCode = cInfo.pureStdCode();

									WTSSessionInfo* sInfo = get_session_info(realCode.c_str(), true);
									uint32_t curTime = sInfo->getCloseTime();
									WTSTickStruct curTS;
									strcpy(curTS.code, realCode.c_str());
									curTS.action_date = _cur_date;
									curTS.action_time = curTime * 100000;

									curTS.price = nextBar.open / factor;
									curTS.volume = nextBar.vol;
									update_price(barsList._code.c_str(), curTS.price);
									WTSTickData* curTick = WTSTickData::create(curTS);
									_listener->handle_tick(realCode.c_str(), curTick);

									curTS.price = nextBar.high / factor;
									curTS.volume = nextBar.vol;
									update_price(barsList._code.c_str(), curTS.price);
									curTick = WTSTickData::create(curTS);
									_listener->handle_tick(realCode.c_str(), curTick);

									curTS.price = nextBar.low / factor;
									update_price(barsList._code.c_str(), curTS.price);
									curTick = WTSTickData::create(curTS);
									_listener->handle_tick(realCode.c_str(), curTick);

									curTS.price = nextBar.close / factor;
									update_price(barsList._code.c_str(), curTS.price);
									curTick = WTSTickData::create(curTS);
									_listener->handle_tick(realCode.c_str(), curTick);

							_listener->handle_bar_close(barsList._code.c_str(), "d", barsList._times, &nextBar);


					if (barsList._cursor >= barsList._bars.size())

	if (_listener)
		_listener->handle_schedule(uDate, uTime);

WTSKlineSlice* HisDataReplayer::get_kline_slice(const char* stdCode, const char* period, uint32_t count, uint32_t times /* = 1 */, bool isMain /* = false */)
	std::string key = StrUtil::printf("%s#%s#%u", stdCode, period, times);
	bool isStk = CodeHelper::isStdStkCode(stdCode);
	if (isMain)
		_main_key = key;

	// 添加_ticker_keys
		if(_ticker_keys.find(stdCode) == _ticker_keys.end())
			_ticker_keys[stdCode] = key;
			std::string oldKey = _ticker_keys[stdCode];
			oldKey = oldKey.substr(strlen(stdCode) + 1);
			if (strcmp(period, "m") == 0 && oldKey.at(0) == 'd')
				_ticker_keys[stdCode] = key;
				_min_period = period;
			else if (oldKey.at(0) == period[0] && times < strtoul(oldKey.substr(2, 1).c_str(), NULL, 10))
				_ticker_keys[stdCode] = key;
				_min_period = period;

	// 判断K线周期
	WTSKlinePeriod kp;
	uint32_t realTimes = times;
	if (strcmp(period, "m") == 0)
		if(times % 5 == 0)
			kp = KP_Minute5;
			realTimes /= 5;
			kp = KP_Minute1;
		kp = KP_DAY;

	// 是否是日线
	bool isDay = kp == KP_DAY;

	// 判断是否已有K线缓存
	auto it = _bars_cache.find(key);
	bool bHasHisData = false;
	bool bHasCache = (it != _bars_cache.end());
	// 如果没有缓存
	if (!bHasCache)
		// 时间周期不等于(1min,1d,5min的倍数)
		if (realTimes != 1)
			std::string rawKey = StrUtil::printf("%s#%s#%u", stdCode, period, 1);
			if (_bars_cache.find(rawKey) == _bars_cache.end())
				 *	By Wesley @ 2021.12.20
				 *	先从extloader加载数据,如果加载不到,再走原来的历史数据存储引擎加载
				if(NULL != _bt_loader)
					bHasHisData = cacheFinalBarsFromLoader(rawKey, stdCode, kp);
					if (_mode == "csv")
						bHasHisData = cacheRawBarsFromCSV(rawKey, stdCode, kp);
						bHasHisData = cacheRawBarsFromBin(rawKey, stdCode, kp);
				bHasHisData = true;
			 *	By Wesley @ 2021.12.20
			 *	先从extloader加载数据,如果加载不到,再走原来的历史数据存储引擎加载
			if (NULL != _bt_loader)
				bHasHisData = cacheFinalBarsFromLoader(key, stdCode, kp);
			// 从csv或bin中加载数据
				if (_mode == "csv")
					bHasHisData = cacheRawBarsFromCSV(key, stdCode, kp);
					bHasHisData = cacheRawBarsFromBin(key, stdCode, kp);
		bHasHisData = true;

	if (!bHasHisData)
		return NULL;

	WTSSessionInfo* sInfo = get_session_info(stdCode, true);
	if(sInfo == NULL)
		WTSLogger::error("Cannot find corresponding session of %s", stdCode);
		return NULL;

	bool isClosed = (sInfo->offsetTime(_cur_time) >= sInfo->getCloseTime(true));
	if (realTimes != 1 && !bHasCache)
		std::string rawKey = StrUtil::printf("%s#%s#%u", stdCode, period, 1);
		BarsList& rawBars = _bars_cache[rawKey];
		WTSKlineSlice* rawKline = WTSKlineSlice::create(stdCode, kp, realTimes, &rawBars._bars[0], rawBars._bars.size());

		static WTSDataFactory dataFact;
		WTSKlineData* kData = dataFact.extractKlineData(rawKline, kp, realTimes, sInfo, true);

			BarsList& barsList = _bars_cache[key];
			barsList._code = stdCode;
			barsList._period = kp;
			barsList._times = realTimes;
			barsList._count = kData->size();
			WTSLogger::info("%u resampled %s%u back kline of %s ready", barsList._bars.size(), period, times, stdCode);
			WTSLogger::error("Resampling %s%u back kline of %s failed", period, times, stdCode);
			return NULL;

	BarsList& kBlkPair = _bars_cache[key];
	if (kBlkPair._cursor == UINT_MAX)
		WTSBarStruct bar;
		bar.date = _cur_tdate;
		if(kp != KP_DAY)
			bar.time = (_cur_date - 19900000) * 10000 + _cur_time;
		auto it = std::lower_bound(kBlkPair._bars.begin(), kBlkPair._bars.end(), bar, [isDay, isClosed](const WTSBarStruct& a, const WTSBarStruct& b){
			if (isDay)
				if (!isClosed)
					return a.date < b.date;
					return a.date <= b.date;
				return a.time < b.time;

		uint32_t eIdx = it - kBlkPair._bars.begin();

		if (it != kBlkPair._bars.end())
			WTSBarStruct& curBar = *it;
			if (isDay)
				if (curBar.date >= _cur_tdate && !isClosed)
					if (eIdx > 0)

				else if (curBar.date > _cur_tdate && isClosed)
					if (eIdx > 0)

				if (curBar.time > bar.time)
					if (eIdx > 0)

			kBlkPair._cursor = eIdx + 1;
			return NULL;
		uint32_t curMin = (_cur_date - 19900000) * 10000 + _cur_time;
		if (isDay)
			if (kBlkPair._cursor <= kBlkPair._count)
					while (kBlkPair._bars[kBlkPair._cursor - 1].date < _cur_tdate  && kBlkPair._cursor < kBlkPair._count && kBlkPair._bars[kBlkPair._cursor].date < _cur_tdate)
					while (kBlkPair._bars[kBlkPair._cursor - 1].date <= _cur_tdate  && kBlkPair._cursor < kBlkPair._count && kBlkPair._bars[kBlkPair._cursor].date <= _cur_tdate)

				if (kBlkPair._bars[kBlkPair._cursor - 1].date > _cur_tdate)
			if (kBlkPair._cursor <= kBlkPair._count)
				while (kBlkPair._bars[kBlkPair._cursor-1].time < curMin && kBlkPair._cursor < kBlkPair._count)

				if (kBlkPair._bars[kBlkPair._cursor - 1].time > curMin)

	if (kBlkPair._cursor == 0)
		return NULL;

	uint32_t sIdx = 0;
	if (kBlkPair._cursor > count)
		sIdx = kBlkPair._cursor - count;

	uint32_t realCnt = kBlkPair._cursor - sIdx;
	WTSKlineSlice* kline = WTSKlineSlice::create(stdCode, kp, 1, kBlkPair._bars.data() + sIdx, realCnt);
	return kline;

WTSTickSlice* HisDataReplayer::get_tick_slice(const char* stdCode, uint32_t count, uint64_t etime)
	if (!_tick_enabled)
		return NULL;

	if (!checkTicks(stdCode, _cur_tdate))
		return NULL;

	auto& tickList = _ticks_cache[stdCode];
	if (tickList._cursor == 0)
		return NULL;

	if (tickList._cursor == UINT_MAX)
		uint32_t uDate = _cur_date;
		uint32_t uTime = _cur_time * 100000 + _cur_secs;

		if (etime != 0)
			uDate = (uint32_t)(etime / 10000);
			uTime = (uint32_t)(etime % 10000 * 100000);

		WTSTickStruct curTick;
		curTick.action_date = uDate;
		curTick.action_time = uTime;

		auto tit = std::lower_bound(tickList._items.begin(), tickList._items.end(), curTick, [](const WTSTickStruct& a, const WTSTickStruct& b){
			if (a.action_date != b.action_date)
				return a.action_date < b.action_date;
				return a.action_time < b.action_time;

		if(tit == tickList._items.end())
			tickList._cursor = tickList._items.size();
			uint32_t idx = tit - tickList._items.begin();
			const WTSTickStruct& thisTick = *tit;
			if (thisTick.action_date > uDate || (thisTick.action_date == uDate && thisTick.action_time > uTime))
				if(idx > 0)
					return NULL;

			tickList._cursor = idx + 1;
	uint32_t eIdx = tickList._cursor - 2;
	uint32_t sIdx = 0;
	if (eIdx >= count - 1)
		sIdx = eIdx + 1 - count;

	uint32_t realCnt = eIdx - sIdx + 1;
	if (realCnt == 0)
		return NULL;

	WTSTickSlice* ticks = WTSTickSlice::create(stdCode, tickList._items.data() + sIdx, realCnt);
	return ticks;

WTSOrdDtlSlice* HisDataReplayer::get_order_detail_slice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
	if (!checkOrderDetails(stdCode, _cur_tdate))
		return NULL;

	auto& dataList = _orddtl_cache[stdCode];
	if (dataList._cursor == 0)
		return NULL;

	if (dataList._cursor == UINT_MAX)
		uint32_t uDate = _cur_date;
		uint32_t uTime = _cur_time * 100000 + _cur_secs;

		if (etime != 0)
			uDate = (uint32_t)(etime / 10000);
			uTime = (uint32_t)(etime % 10000 * 100000);

		WTSOrdDtlStruct curItem;
		curItem.action_date = uDate;
		curItem.action_time = uTime;

		auto tit = std::lower_bound(dataList._items.begin(), dataList._items.end(), curItem, [](const WTSOrdDtlStruct& a, const WTSOrdDtlStruct& b) {
			if (a.action_date != b.action_date)
				return a.action_date < b.action_date;
				return a.action_time < b.action_time;

		uint32_t idx = tit - dataList._items.begin();
		dataList._cursor = idx + 1;

	uint32_t eIdx = dataList._cursor - 1;
	uint32_t sIdx = 0;
	if (eIdx >= count - 1)
		sIdx = eIdx + 1 - count;

	uint32_t realCnt = eIdx - sIdx + 1;
	if (realCnt == 0)
		return NULL;

	WTSOrdDtlSlice* ticks = WTSOrdDtlSlice::create(stdCode, dataList._items.data() + sIdx, realCnt);
	return ticks;

WTSOrdQueSlice* HisDataReplayer::get_order_queue_slice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
	if (!checkOrderQueues(stdCode, _cur_tdate))
		return NULL;

	auto& dataList = _ordque_cache[stdCode];
	if (dataList._cursor == 0)
		return NULL;

	if (dataList._cursor == UINT_MAX)
		uint32_t uDate = _cur_date;
		uint32_t uTime = _cur_time * 100000 + _cur_secs;

		if (etime != 0)
			uDate = (uint32_t)(etime / 10000);
			uTime = (uint32_t)(etime % 10000 * 100000);

		WTSOrdQueStruct curItem;
		curItem.action_date = uDate;
		curItem.action_time = uTime;

		auto tit = std::lower_bound(dataList._items.begin(), dataList._items.end(), curItem, [](const WTSOrdQueStruct& a, const WTSOrdQueStruct& b) {
			if (a.action_date != b.action_date)
				return a.action_date < b.action_date;
				return a.action_time < b.action_time;

		uint32_t idx = tit - dataList._items.begin();
		dataList._cursor = idx + 1;

	uint32_t eIdx = dataList._cursor - 1;
	uint32_t sIdx = 0;
	if (eIdx >= count - 1)
		sIdx = eIdx + 1 - count;

	uint32_t realCnt = eIdx - sIdx + 1;
	if (realCnt == 0)
		return NULL;

	WTSOrdQueSlice* ticks = WTSOrdQueSlice::create(stdCode, dataList._items.data() + sIdx, realCnt);
	return ticks;

WTSTransSlice* HisDataReplayer::get_transaction_slice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
	if (!checkTransactions(stdCode, _cur_tdate))
		return NULL;

	auto& dataList = _trans_cache[stdCode];
	if (dataList._cursor == 0)
		return NULL;

	if (dataList._cursor == UINT_MAX)
		uint32_t uDate = _cur_date;
		uint32_t uTime = _cur_time * 100000 + _cur_secs;

		if (etime != 0)
			uDate = (uint32_t)(etime / 10000);
			uTime = (uint32_t)(etime % 10000 * 100000);

		WTSTransStruct curItem;
		curItem.action_date = uDate;
		curItem.action_time = uTime;

		auto tit = std::lower_bound(dataList._items.begin(), dataList._items.end(), curItem, [](const WTSTransStruct& a, const WTSTransStruct& b) {
			if (a.action_date != b.action_date)
				return a.action_date < b.action_date;
				return a.action_time < b.action_time;

		uint32_t idx = tit - dataList._items.begin();
		dataList._cursor = idx + 1;

	uint32_t eIdx = dataList._cursor - 1;
	uint32_t sIdx = 0;
	if (eIdx >= count - 1)
		sIdx = eIdx + 1 - count;

	uint32_t realCnt = eIdx - sIdx + 1;
	if (realCnt == 0)
		return NULL;

	WTSTransSlice* ticks = WTSTransSlice::create(stdCode, dataList._items.data() + sIdx, realCnt);
	return ticks;

bool HisDataReplayer::checkAllTicks(uint32_t uDate)
	bool bHasTick = false;
	for (auto& v : _tick_sub_map)
		bHasTick = bHasTick || checkTicks(v.first.c_str(), uDate);

	return bHasTick;

bool HisDataReplayer::checkOrderDetails(const char* stdCode, uint32_t uDate)
	bool bNeedCache = false;
	auto it = _ticks_cache.find(stdCode);
	if (it == _ticks_cache.end())
		bNeedCache = true;
		auto& tickList = it->second;
		if (tickList._date != uDate)
			bNeedCache = true;

	if (bNeedCache)
		bool hasTicks = false;
		if(NULL != _bt_loader)
			hasTicks = cacheRawTicksFromLoader(stdCode, stdCode, uDate);
		else if (_mode == "csv")
			hasTicks = cacheRawTicksFromCSV(stdCode, stdCode, uDate);
			hasTicks = cacheRawTicksFromBin(stdCode, stdCode, uDate);

		if (!hasTicks)
			return false;

	return true;

bool HisDataReplayer::checkOrderQueues(const char* stdCode, uint32_t uDate)
	bool bNeedCache = false;
	auto it = _ticks_cache.find(stdCode);
	if (it == _ticks_cache.end())
		bNeedCache = true;
		auto& tickList = it->second;
		if (tickList._date != uDate)
			bNeedCache = true;

	if (bNeedCache)
		bool hasTicks = false;
		if (NULL != _bt_loader)
			hasTicks = cacheRawTicksFromLoader(stdCode, stdCode, uDate);
		else if (_mode == "csv")
			hasTicks = cacheRawTicksFromCSV(stdCode, stdCode, uDate);
			hasTicks = cacheRawTicksFromBin(stdCode, stdCode, uDate);

		if (!hasTicks)
			return false;

	return true;

bool HisDataReplayer::checkTransactions(const char* stdCode, uint32_t uDate)
	bool bNeedCache = false;
	auto it = _ticks_cache.find(stdCode);
	if (it == _ticks_cache.end())
		bNeedCache = true;
		auto& tickList = it->second;
		if (tickList._date != uDate)
			bNeedCache = true;

	if (bNeedCache)
		bool hasTicks = false;
		if (_mode == "csv")
			hasTicks = cacheRawTicksFromCSV(stdCode, stdCode, uDate);
			hasTicks = cacheRawTicksFromBin(stdCode, stdCode, uDate);

		if (!hasTicks)
			return false;

	return true;

bool HisDataReplayer::checkTicks(const char* stdCode, uint32_t uDate)
	if (strlen(stdCode) == 0)
		return false;

	bool bNeedCache = false;
	auto it = _ticks_cache.find(stdCode);
	if (it == _ticks_cache.end())
		bNeedCache = true;
		auto& tickList = it->second;
		if (tickList._date != uDate)
			bNeedCache = true;
		else if (tickList._count == 0)
			return false;
	if (bNeedCache)
		bool hasTicks = false;
		if (NULL != _bt_loader)
			hasTicks = cacheRawTicksFromLoader(stdCode, stdCode, uDate);
		else if (_mode == "csv")
			hasTicks = cacheRawTicksFromCSV(stdCode, stdCode, uDate);
			hasTicks = cacheRawTicksFromBin(stdCode, stdCode, uDate);

		if (!hasTicks)
			auto& ticksList = _ticks_cache[stdCode];
			ticksList._cursor = UINT_MAX;
			ticksList._code = stdCode;
			ticksList._date = uDate;
			ticksList._count = 0;
			return false;

	return true;

WTSTickData* HisDataReplayer::get_last_tick(const char* stdCode)
	if (!checkTicks(stdCode, _cur_tdate))
		return NULL;

	auto& tickList = _ticks_cache[stdCode];
	if (tickList._cursor == 0)
		return NULL;

	if (tickList._cursor == UINT_MAX)
		uint32_t uDate = _cur_date;
		uint32_t uTime = _cur_time * 100000 + _cur_secs;

		WTSTickStruct curTick;
		curTick.action_date = uDate;
		curTick.action_time = uTime;

		auto tit = std::lower_bound(tickList._items.begin(), tickList._items.end(), curTick, [](const WTSTickStruct& a, const WTSTickStruct& b){
			if (a.action_date != b.action_date)
				return a.action_date < b.action_date;
				return a.action_time < b.action_time;

		uint32_t idx = tit - tickList._items.begin();
		tickList._cursor = idx + 1;

	return WTSTickData::create(tickList._items[tickList._cursor - 1]);

WTSCommodityInfo* HisDataReplayer::get_commodity_info(const char* stdCode)
	return _bd_mgr.getCommodity(CodeHelper::stdCodeToStdCommID(stdCode).c_str());

WTSSessionInfo* HisDataReplayer::get_session_info(const char* sid, bool isCode /* = false */)
	if (!isCode)
		return _bd_mgr.getSession(sid);

	WTSCommodityInfo* cInfo = _bd_mgr.getCommodity(CodeHelper::stdCodeToStdCommID(sid).c_str());
	if (cInfo == NULL)
		return NULL;

	return _bd_mgr.getSession(cInfo->getSession());

void HisDataReplayer::loadFees(const char* filename)
	if (strlen(filename) == 0)

	if (!StdFile::exists(filename))
		WTSLogger::error("Fees template file %s not exists", filename);

	std::string content;
	StdFile::read_file_content(filename, content);
	if (content.empty())
		WTSLogger::error("Fees template file %s is empty", filename);

	rj::Document root;

	if (root.HasParseError())
		WTSLogger::error("Parsing fees template file %s failed", filename);

	WTSVariant* cfg = WTSVariant::createObject();
	if (!jsonToVariant(root, cfg))
		WTSLogger::error("Converting fees template file %s failed", filename);

	auto keys = cfg->memberNames();
	for (const std::string& key : keys)
		WTSVariant* cfgItem = cfg->get(key.c_str());
		FeeItem& fItem = _fee_map[key];
		fItem._by_volume = cfgItem->getBoolean("byvolume");
		fItem._open = cfgItem->getDouble("open");
		fItem._close = cfgItem->getDouble("close");
		fItem._close_today = cfgItem->getDouble("closetoday");

	WTSLogger::info("%u items of fees template loaded", _fee_map.size());

double HisDataReplayer::calc_fee(const char* stdCode, double price, double qty, uint32_t offset)
	std::string stdPID = CodeHelper::stdCodeToStdCommID(stdCode);
	auto it = _fee_map.find(stdPID);
	if (it == _fee_map.end())
		return 0.0;

	double ret = 0.0;
	WTSCommodityInfo* commInfo = _bd_mgr.getCommodity(stdPID.c_str());
	const FeeItem& fItem = it->second;
	if (fItem._by_volume)
		switch (offset)
		case 0: ret = fItem._open*qty; break;
		case 1: ret = fItem._close*qty; break;
		case 2: ret = fItem._close_today*qty; break;
		default: ret = 0.0; break;
		double amount = price*qty*commInfo->getVolScale();
		switch (offset)
		case 0: ret = fItem._open*amount; break;
		case 1: ret = fItem._close*amount; break;
		case 2: ret = fItem._close_today*amount; break;
		default: ret = 0.0; break;

	return (int32_t)(ret * 100 + 0.5) / 100.0;

double HisDataReplayer::get_cur_price(const char* stdCode)
	auto it = _price_map.find(stdCode);
	if (it == _price_map.end())
		return 0.0;
		return it->second;

void HisDataReplayer::sub_tick(uint32_t sid, const char* stdCode)
	if (strlen(stdCode) == 0)

	SIDSet& sids = _tick_sub_map[stdCode];

void HisDataReplayer::sub_order_detail(uint32_t sid, const char* stdCode)
	if (strlen(stdCode) == 0)

	SIDSet& sids = _orddtl_sub_map[stdCode];

void HisDataReplayer::sub_order_queue(uint32_t sid, const char* stdCode)
	if (strlen(stdCode) == 0)

	SIDSet& sids = _ordque_sub_map[stdCode];

void HisDataReplayer::sub_transaction(uint32_t sid, const char* stdCode)
	if (strlen(stdCode) == 0)

	SIDSet& sids = _trans_sub_map[stdCode];

// 检查未闭合K线
void HisDataReplayer::checkUnbars()
	for(auto& m : _tick_sub_map)					// 遍历tick数据订阅表
		const char* stdCode = m.first.c_str();		// 品种名称
		bool bHasBars = false;						// 
		for (auto& m : _unbars_cache)				// 遍历未订阅的K线缓存
			const std::string& key = m.first;
			auto ay = StrUtil::split(key, "#");
			if (ay[0].compare(stdCode) == 0)		// 若在未订阅K线缓存中找到该品种
				bHasBars = true;

		for (auto& m : _bars_cache)					// 遍历K线缓存
			const std::string& key = m.first;		// 订阅K线中的品种名称
			auto ay = StrUtil::split(key, "#");
			if (ay[0].compare(stdCode) == 0)		// 若在K线缓存中找到该品种
				bHasBars = true;

		if (bHasBars)

		// 如果订阅了tick,但是没有对应的K线数据, 则自动加载1分钟线到内存中
		bool bHasHisData = false;
		std::string key = StrUtil::printf("%s#m#1", stdCode);

		 *	By Wesley @ 2021.12.20
		 *	先从 _bt_loader 加载最终的K线数据
		 *	如果加载不到,再从配置的历史数据存储引擎加载数据
		if (NULL != _bt_loader)
			bHasHisData = cacheFinalBarsFromLoader(key, stdCode, KP_Minute1, false);

		// 这段代码是否有问题
			if (_mode == "csv")
				bHasHisData = cacheRawBarsFromCSV(key, stdCode, KP_Minute1, false);
				bHasHisData = cacheRawBarsFromBin(key, stdCode, KP_Minute1, false);
		if (!bHasHisData)

		WTSSessionInfo* sInfo = get_session_info(stdCode, true);

		BarsList& kBlkPair = _unbars_cache[key];
		WTSBarStruct bar;
		bar.date = _cur_tdate;
		bar.time = (_cur_date - 19900000) * 10000 + _cur_time;

		auto it = std::lower_bound(kBlkPair._bars.begin(), kBlkPair._bars.end(), bar, [](const WTSBarStruct& a, const WTSBarStruct& b) {
			return a.time < b.time;

		uint32_t eIdx = it - kBlkPair._bars.begin();

		if (it != kBlkPair._bars.end())
			WTSBarStruct& curBar = *it;
			if (curBar.time > bar.time)
				if (eIdx > 0)

			kBlkPair._cursor = eIdx + 1;

uint32_t strToTime(const char* strTime, bool bHasSec = false)
	std::string str;
	const char *pos = strTime;
	while (strlen(pos) > 0)
		if (pos[0] != ':')
			str.append(pos, 1);
	uint32_t ret = strtoul(str.c_str(), NULL, 10);
	if (str.size() > 4 && !bHasSec)
		ret /= 100;

	return ret;

uint32_t strToDate(const char* strDate)
	StringVector ay = StrUtil::split(strDate, "/");
	if(ay.size() == 1)
		ay = StrUtil::split(strDate, "-");
	std::stringstream ss;
	if (ay.size() > 1)
		auto pos = ay[2].find(" ");
		if (pos != std::string::npos)
			ay[2] = ay[2].substr(0, pos);
		ss << ay[0] << (ay[1].size() == 1 ? "0" : "") << ay[1] << (ay[2].size() == 1 ? "0" : "") << ay[2];
		ss << ay[0];

	return strtoul(ss.str().c_str(), NULL, 10);

bool HisDataReplayer::cacheRawTicksFromBin(const std::string& key, const char* stdCode, uint32_t uDate)
	CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
	std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
	std::string rawCode = cInfo._code;
	if (cInfo.isHot())
		rawCode = _hot_mgr.getRawCode(cInfo._exchg, cInfo._product, uDate);
	else if(cInfo.isSecond())
		rawCode = _hot_mgr.getSecondRawCode(cInfo._exchg, cInfo._product, uDate);

	std::string filename;
	bool bHit = false;
	// 先检查有没有HOT、SND的主力次主力的tick文件
		const char* hot_flag = cInfo.isFlat() ? "" : (cInfo.isHot() ? "HOT" : "2ND");
		std::stringstream ss;
		ss << _base_dir << "his/ticks/" << cInfo._exchg << "/" << uDate << "/" << cInfo._product << "_" << hot_flag << ".dsb";
		filename = ss.str();
		if (StdFile::exists(filename.c_str()))
			bHit = true;

	// 如果没有找到,则读取分月合约
	if (!bHit)
		std::stringstream ss;
		ss << _base_dir << "his/ticks/" << cInfo._exchg << "/" << uDate << "/" << rawCode << ".dsb";
		filename = ss.str();

	if (!StdFile::exists(filename.c_str()))
		return false;

	std::string content;
	StdFile::read_file_content(filename.c_str(), content);
	if (content.size() < sizeof(HisTickBlock))
		WTSLogger::error("Sizechecking of back tick data file %s failed", filename.c_str());
		return false;

	HisTickBlock* tBlock = (HisTickBlock*)content.c_str();
	HisTickBlockV2* tBlockV2 = NULL;
	if(tBlock->_version == BLOCK_VERSION_CMP)
		// 压缩版本, 要重新检查文件大小
		tBlockV2 = (HisTickBlockV2*)content.c_str();

		if (content.size() != (sizeof(HisTickBlockV2) + tBlockV2->_size))
			// WTSLogger::error("历史Tick数据文件%s大小校验失败", filename.c_str());
			WTSLogger::error("Sizechecking of back tick data file %s failed", filename.c_str());
			return false;

	auto& ticksList = _ticks_cache[key];
	uint32_t tickcnt = 0;
	if (tBlockV2 == NULL)
		tickcnt = (content.size() - sizeof(HisTickBlock)) / sizeof(WTSTickStruct);
		memcpy(ticksList._items.data(), tBlock->_ticks, sizeof(WTSTickStruct)*tickcnt);
		// 需要解压
		std::string buf = WTSCmpHelper::uncompress_data(tBlockV2->_data, (uint32_t)tBlockV2->_size);
		tickcnt = buf.size() / sizeof(WTSTickStruct);
		memcpy(ticksList._items.data(), buf.data(), buf.size());
	ticksList._cursor = UINT_MAX;
	ticksList._code = stdCode;
	ticksList._date = uDate;
	ticksList._count = tickcnt;

	return true;

bool HisDataReplayer::cacheRawTicksFromLoader(const std::string& key, const char* stdCode, uint32_t uDate)
	if (NULL == _bt_loader)
		return false;

	auto& ticksList = _ticks_cache[key];
	ticksList._cursor = UINT_MAX;
	ticksList._code = stdCode;
	ticksList._date = uDate;
	ticksList._count = 0;

	bool bSucc = _bt_loader->loadRawHisTicks(&ticksList, stdCode, uDate, [](void* obj, WTSTickStruct* firstTick, uint32_t count) {
		HftDataList<WTSTickStruct>* ticks = (HftDataList<WTSTickStruct>*)obj;
		ticks->_count = count;
		memcpy(ticks->_items.data(), firstTick, sizeof(WTSTickStruct)*count);

	if (!bSucc)
		return false;

	WTSLogger::info(fmt::format("{} items of back tick data of {} on {} loaded via extended loader", ticksList._count, stdCode, uDate).c_str());

	return true;

bool HisDataReplayer::cacheRawTicksFromCSV(const std::string& key, const char* stdCode, uint32_t uDate)
	if (strlen(stdCode) == 0)
		return false;

	std::stringstream ss;
	ss << _base_dir << "bin/ticks/";
	std::string path = ss.str();
	if (!StdFile::exists(path.c_str()))
	ss << stdCode << "_tick_" << uDate << ".dsb";
	std::string filename = ss.str();
	if (StdFile::exists(filename.c_str()))
		// 如果有格式化的历史数据文件, 则直接读取
		std::string content;
		StdFile::read_file_content(filename.c_str(), content);
		if (content.size() < sizeof(HisTickBlockV2))
			WTSLogger::error("Sizechecking of back tick data file %s failed", filename.c_str());
			return false;

		HisTickBlockV2* tBlock = (HisTickBlockV2*)content.data();
		std::string rawData = WTSCmpHelper::uncompress_data(tBlock->_data, (uint32_t)tBlock->_size);
		uint32_t tickcnt = rawData.size() / sizeof(WTSTickStruct);

		auto& ticksList = _ticks_cache[key];
		memcpy(ticksList._items.data(), rawData.data(), rawData.size());
		ticksList._cursor = UINT_MAX;
		ticksList._code = stdCode;
		ticksList._date = uDate;
		ticksList._count = tickcnt;
		//如果没有格式化的历史数据文件, 则从csv加载
		std::stringstream ss;
		ss << _base_dir << "csv/ticks/" << stdCode << "_tick_" << uDate << ".csv";
		std::string csvfile = ss.str();

		if (!StdFile::exists(csvfile.c_str()))
			WTSLogger::error("Back tick data file %s not exists", csvfile.c_str());
			return false;

		std::ifstream ifs;

		WTSLogger::info("Reading data from %s...", csvfile.c_str());

		char buffer[1024];
		bool headerskipped = false;
		auto& tickList = _ticks_cache[key];
		tickList._code = stdCode;
		tickList._date = uDate;
		while (!ifs.eof())
			ifs.getline(buffer, 1024);
			if (strlen(buffer) == 0)

			if (!headerskipped)
				headerskipped = true;

			StringVector ay = StrUtil::split(buffer, ",");
			WTSTickStruct ticks;
			ticks.action_date = strToDate(ay[0].c_str());
			ticks.action_time = strToTime(ay[1].c_str(), true) * 1000;
			ticks.price = strtod(ay[2].c_str(), NULL);
			ticks.volume = strtoul(ay[3].c_str(), NULL, 10);

			if (tickList._items.size() % 1000 == 0)
				//WTSLogger::info("已读取数据%u条", tickList._items.size());
				WTSLogger::info("%u lines of data loaded", tickList._items.size());
		tickList._count = tickList._items.size();
		//WTSLogger::info("数据文件%s全部读取完成, 共%u条", csvfile.c_str(), tickList._items.size());
		WTSLogger::info("Data file %s all loaded, totally %u items", csvfile.c_str(), tickList._items.size());

		 *	By Wesley @ 2021.12.14
		 *	这一段之前有bug,之前没有把文件头写到文件里,所以转储的dsb解析的时候会抛出异常
		std::string content;
		HisTickBlockV2 *pBlk = (HisTickBlockV2*)content.data();
		strcpy(pBlk->_blk_flag, BLK_FLAG);
		pBlk->_type = BT_HIS_Ticks;
		pBlk->_version = BLOCK_VERSION_CMP;

		std::string cmpData = WTSCmpHelper::compress_data(tickList._items.data(), sizeof(WTSTickStruct)*tickList._count);
		pBlk->_size = cmpData.size();

		StdFile::write_file_content(filename.c_str(), content.c_str(), content.size());
		WTSLogger::info("Ticks transfered to file %s", filename.c_str());

	return true;

// 从外部加载器加载最终历史K线数据, 并存入K线缓存 _bars_cache 或 _unbars_cache
bool HisDataReplayer::cacheFinalBarsFromLoader(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed /* = true */)
	if (NULL == _bt_loader)
		return false;

	// 获取标准合约结构体
	CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
	// 标准品种ID
	std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);

	std::string pname;
	std::string dirname;
	switch (period)
		case KP_Minute1: pname = "m1"; dirname = "min1"; break;
		case KP_Minute5: pname = "m5"; dirname = "min5"; break;
		case KP_DAY: pname = "d"; dirname = "day"; break;
		default: pname = ""; break;

	bool isDay = (period == KP_DAY);

	std::stringstream ss;
	// 文件保存目录eg: storage/his/min5/CFFEX/CFFEX.IF_HOT.dsb
	ss << _base_dir << "his/" << dirname << "/" << cInfo._exchg << "/";
	if (!StdFile::exists(ss.str().c_str()))

	if (cInfo.isHot() && cInfo.isFuture())
		ss << cInfo._exchg << "." << cInfo._product << "_HOT.dsb";
	else if (cInfo.isSecond() && cInfo.isFuture())
		ss << cInfo._exchg << "." << cInfo._product << "_2ND.dsb";
	else if (cInfo.isExright() && cInfo.isStock())

		ss << cInfo._code << ".dsb";
	std::string filename = ss.str();

	bool bHit = false;
	if(_bt_loader->isAutoTrans() && StdFile::exists(filename.c_str()))
		// 如果支持自动转储, 则先读取已经转储的dsb文件
		std::string content;
		StdFile::read_file_content(filename.c_str(), content);
		if (content.size() < sizeof(HisKlineBlockV2))
			WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());
			HisKlineBlockV2* kBlock = (HisKlineBlockV2*)content.c_str();
			std::string rawData = WTSCmpHelper::uncompress_data(kBlock->_data, (uint32_t)kBlock->_size);
			uint32_t barcnt = rawData.size() / sizeof(WTSBarStruct);

			BarsList& barList = bSubbed ? _bars_cache[key] : _unbars_cache[key];
			memcpy(barList._bars.data(), rawData.data(), rawData.size());
			barList._cursor = UINT_MAX;
			barList._code = stdCode;
			barList._period = period;
			barList._count = barcnt;

			uint32_t stime = isDay ? barList._bars[0].date : barList._bars[0].time;
			uint32_t etime = isDay ? barList._bars[barcnt - 1].date : barList._bars[barcnt - 1].time;

			WTSLogger::info(fmt::format("{} items of back {} data of {} directly loaded from dsb file, from {} to {}", barcnt, pname.c_str(), stdCode, stime, etime).c_str());
			bHit = true;

		//如果没有转储的历史数据文件, 则从csv加载
		WTSLogger::info("Reading data via extended loader...");

		BarsList& barList = bSubbed ? _bars_cache[key] : _unbars_cache[key];
		barList._code = stdCode;
		barList._period = period;
		barList._cursor = UINT_MAX;
		barList._count = 0;

		std::string buffer;
		bool bSucc = _bt_loader->loadFinalHisBars(&barList, stdCode, period, [](void* obj, WTSBarStruct* firstBar, uint32_t count) {
			BarsList* bars = (BarsList*)obj;
			bars->_count = count;
			memcpy((void*)bars->_bars.data(), firstBar, sizeof(WTSBarStruct)*count);

		if (!bSucc)
			return false;

		bool isDay = (period == KP_DAY);

		uint32_t stime = isDay ? barList._bars[0].date : barList._bars[0].time;
		uint32_t etime = isDay ? barList._bars[barList._count - 1].date : barList._bars[barList._count - 1].time;

		WTSLogger::info(fmt::format("{} items of back {} data of {} loaded via extended loader, from {} to {}", barList._count, pname.c_str(), stdCode, stime, etime).c_str());

			BlockType btype;
			switch (period)
			case KP_Minute1: btype = BT_HIS_Minute1; break;
			case KP_Minute5: btype = BT_HIS_Minute5; break;
			default: btype = BT_HIS_Day; break;

			 *	By Wesley @ 2021.12.14
			 *	这一段之前有bug,之前没有把文件头写到文件里,所以转储的dsb解析的时候会抛出异常
			std::string content;
			HisKlineBlockV2 *kBlock = (HisKlineBlockV2*)content.data();
			strcpy(kBlock->_blk_flag, BLK_FLAG);
			kBlock->_type = btype;
			kBlock->_version = BLOCK_VERSION_CMP;

			std::string cmpData = WTSCmpHelper::compress_data(barList._bars.data(), sizeof(WTSBarStruct)*barList._count);
			kBlock->_size = cmpData.size();

			StdFile::write_file_content(filename.c_str(), content.c_str(), content.size());
			WTSLogger::info("Bars transfered to file %s", filename.c_str());
	return true;

bool HisDataReplayer::cacheRawBarsFromCSV(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed/* = true*/)
	// 自定义合约结构体
	CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
	// 标准品种ID
	std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);

	std::string pname;
	std::string dirname;
	switch (period)
		case KP_Minute1: pname = "m1"; dirname = "min1"; break;
		case KP_Minute5: pname = "m5"; dirname = "min5"; break;
		case KP_DAY: pname = "d"; dirname = "day"; break;
		default: pname = ""; break;

	bool isDay = (period == KP_DAY);

	std::stringstream ss;
	ss << _base_dir << "his/" << dirname << "/" << cInfo._exchg << "/";

	// 这里自动创建,是因为后面转储需要
	if (!StdFile::exists(ss.str().c_str()))

	if(cInfo.isHot() && cInfo.isFuture())
		ss << cInfo._exchg << "." << cInfo._product << "_HOT.dsb";
	else if (cInfo.isSecond() && cInfo.isFuture())
		ss << cInfo._exchg << "." << cInfo._product << "_2ND.dsb";
	else if (cInfo.isExright() && cInfo.isStock())

		ss << cInfo._code << ".dsb";
	std::string filename = ss.str();
	if (StdFile::exists(filename.c_str()))
		// 如果有格式化的历史数据文件, 则直接读取
		std::string content;
		StdFile::read_file_content(filename.c_str(), content);
		if (content.size() < sizeof(HisKlineBlockV2))
			// WTSLogger::error("历史K线数据文件%s大小校验失败", filename.c_str());
			WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());
			return false;

		HisKlineBlockV2* kBlock = (HisKlineBlockV2*)content.c_str();
		std::string rawData = WTSCmpHelper::uncompress_data(kBlock->_data, (uint32_t)kBlock->_size);
		uint32_t barcnt = rawData.size() / sizeof(WTSBarStruct);

		BarsList& barList = bSubbed ? _bars_cache[key] : _unbars_cache[key];
		memcpy(barList._bars.data(), rawData.data(), rawData.size());
		barList._cursor = UINT_MAX;
		barList._code = stdCode;
		barList._period = period;
		barList._count = barcnt;

		uint32_t stime = isDay ? barList._bars[0].date : barList._bars[0].time;
		uint32_t etime = isDay ? barList._bars[barcnt-1].date : barList._bars[barcnt-1].time;

		WTSLogger::info(fmt::format("{} items of back {} data of {} directly loaded from dsb file, from {} to {}", barcnt, pname.c_str(), stdCode, stime, etime).c_str());
		// 如果没有格式化的历史数据文件, 则从csv加载
		std::stringstream ss;
		ss << _base_dir << "csv/" << stdCode << "_" << pname << ".csv";
		std::string csvfile = ss.str();

		if (!StdFile::exists(csvfile.c_str()))
			WTSLogger::error("Back kbar data file %s not exists", csvfile.c_str());
			return false;

		CsvReader reader;

		WTSLogger::info("Reading data from %s, with fields: %s...", csvfile.c_str(), reader.fields());

		BarsList& barList = bSubbed ? _bars_cache[key] : _unbars_cache[key];
		barList._code = stdCode;
		barList._period = period;
		while (reader.next_row())
			// 逐行读取
			WTSBarStruct bs;
			bs.date = strToDate(reader.get_string("date"));
			if (period != KP_DAY)
				bs.time = TimeUtils::timeToMinBar(bs.date, strToTime(reader.get_string("time")));
			bs.open = reader.get_double("open");
			bs.high = reader.get_double("high");
			bs.low = reader.get_double("low");
			bs.close = reader.get_double("close");
			bs.vol = reader.get_uint32("volume");
			bs.money = reader.get_double("turnover");
			bs.hold = reader.get_uint32("open_interest");
			bs.add = reader.get_int32("diff_interest");
			bs.settle = reader.get_double("settle");

			if (barList._bars.size() % 1000 == 0)
				WTSLogger::info("%u lines of data loaded", barList._bars.size());
		barList._count = barList._bars.size();

		uint32_t stime = isDay ? barList._bars[0].date : barList._bars[0].time;
		uint32_t etime = isDay ? barList._bars[barList._count - 1].date : barList._bars[barList._count - 1].time;

		WTSLogger::info(fmt::format("Data file {} all loaded, totally {} items, from {} to {}", csvfile.c_str(), barList._bars.size(), stime, etime).c_str());

		BlockType btype;
		switch (period)
			case KP_Minute1: btype = BT_HIS_Minute1; break;
			case KP_Minute5: btype = BT_HIS_Minute5; break;
			default: btype = BT_HIS_Day; break;

		 *	By Wesley @ 2021.12.14
		 *	这一段之前有bug,之前没有把文件头写到文件里,所以转储的dsb解析的时候会抛出异常
		std::string content;
		HisKlineBlockV2 *kBlock = (HisKlineBlockV2*)content.data();
		strcpy(kBlock->_blk_flag, BLK_FLAG);
		kBlock->_type = btype;
		kBlock->_version = BLOCK_VERSION_CMP;

		std::string cmpData = WTSCmpHelper::compress_data(barList._bars.data(), sizeof(WTSBarStruct)*barList._count);
		kBlock->_size = cmpData.size();

		StdFile::write_file_content(filename.c_str(), content.c_str(), content.size());
		WTSLogger::info("Bars transfered to file %s", filename.c_str());

	return true;

bool HisDataReplayer::cacheIntegratedFutBars(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed /* = true */)
	CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
	std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);

	uint32_t curDate = TimeUtils::getCurDate();
	uint32_t curTime = TimeUtils::getCurMin() / 100;

	uint32_t endTDate = _bd_mgr.calcTradingDate(stdPID.c_str(), curDate, curTime, false);

	std::string pname;
	switch (period)
	case KP_Minute1: pname = "min1"; break;
	case KP_Minute5: pname = "min5"; break;
	default: pname = "day"; break;

	BarsList& barList = _bars_cache[key];
	barList._code = stdCode;
	barList._period = period;

	std::vector<std::vector<WTSBarStruct>*> barsSections;

	uint32_t realCnt = 0;
	bool isDay = (period == KP_DAY);

	const char* hot_flag = cInfo.isHot() ? "HOT" : "2ND";

	std::vector<WTSBarStruct>* hotAy = NULL;
	uint32_t lastHotTime = 0;
		 *	By Wesley @ 2021.12.20
		 *	本来这里是要先调用_loader->loadRawHisBars从外部加载器读取主力合约数据的
		 *	但是上层会调用一次loadFinalHisBars,这里再调用loadRawHisBars就冗余了,所以直接跳过
		std::stringstream ss;
		ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << cInfo._exchg << "." << cInfo._product << "_" << hot_flag << ".dsb";
		std::string filename = ss.str();
		if (!StdFile::exists(filename.c_str()))

		std::string content;
		StdFile::read_file_content(filename.c_str(), content);
		if (content.size() < sizeof(HisKlineBlock))
			WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());

		HisKlineBlock* kBlock = (HisKlineBlock*)content.c_str();
		std::string buffer;
		if (kBlock->_version == BLOCK_VERSION_CMP)
			if (content.size() < sizeof(HisKlineBlockV2))
				WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());

			HisKlineBlockV2* kBlockV2 = (HisKlineBlockV2*)content.c_str();
			if (kBlockV2->_size == 0)

			buffer = WTSCmpHelper::uncompress_data(kBlockV2->_data, (uint32_t)kBlockV2->_size);
			content.erase(0, sizeof(HisKlineBlock));

		uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
		if (barcnt <= 0)

		hotAy = new std::vector<WTSBarStruct>();
		memcpy(hotAy->data(), buffer.data(), buffer.size());

		if (period != KP_DAY)
			lastHotTime = hotAy->at(barcnt - 1).time;
			lastHotTime = hotAy->at(barcnt - 1).date;

		uint32_t stime = isDay ? barList._bars[0].date : barList._bars[0].time;
		uint32_t etime = isDay ? barList._bars[barcnt - 1].date : barList._bars[barcnt - 1].time;

		WTSLogger::info(fmt::format("{} items of back {} data of hot contract {} directly loaded, from {} to {}", barcnt, pname.c_str(), stdCode, stime, etime).c_str());

	} while (false);

	HotSections secs;
	if (cInfo.isHot())
		if (!_hot_mgr.splitHotSecions(cInfo._exchg, cInfo._product, 19900102, endTDate, secs))
			return false;
	else if (cInfo.isSecond())
		if (!_hot_mgr.splitSecondSecions(cInfo._exchg, cInfo._product, 19900102, endTDate, secs))
			return false;

	if (secs.empty())
		return false;

	bool bAllCovered = false;
	for (auto it = secs.rbegin(); it != secs.rend(); it++)
		const HotSection& hotSec = *it;
		const char* curCode = hotSec._code.c_str();
		uint32_t rightDt = hotSec._e_date;
		uint32_t leftDt = hotSec._s_date;

		WTSBarStruct sBar, eBar;
		if (period != KP_DAY)
			uint64_t sTime = _bd_mgr.getBoundaryTime(stdPID.c_str(), leftDt, false, true);
			uint64_t eTime = _bd_mgr.getBoundaryTime(stdPID.c_str(), rightDt, false, false);

			sBar.date = leftDt;
			sBar.time = ((uint32_t)(sTime / 10000) - 19900000) * 10000 + (uint32_t)(sTime % 10000);

			if (sBar.time < lastHotTime)	//如果边界时间小于主力的最后一根Bar的时间, 说明已经有交叉了, 则不需要再处理了
				bAllCovered = true;
				sBar.time = lastHotTime + 1;

			eBar.date = rightDt;
			eBar.time = ((uint32_t)(eTime / 10000) - 19900000) * 10000 + (uint32_t)(eTime % 10000);

			if (eBar.time <= lastHotTime)	//右边界时间小于最后一条Hot时间, 说明全部交叉了, 没有再找的必要了
			sBar.date = leftDt;
			if (sBar.date < lastHotTime)	//如果边界时间小于主力的最后一根Bar的时间, 说明已经有交叉了, 则不需要再处理了
				bAllCovered = true;
				sBar.date = lastHotTime + 1;

			eBar.date = rightDt;

			if (eBar.date <= lastHotTime)

		 *	By Wesley @ 2021.12.20
		 *	先从extloader读取分月合约的K线数据
		 *	如果没有读到,再从文件读取
		bool bLoaded = false;
		std::string buffer;
		if (NULL != _bt_loader)
			std::string wCode = StrUtil::printf("%s.%s.%s", cInfo._exchg, cInfo._product, (char*)curCode + strlen(cInfo._product));
			bLoaded = _bt_loader->loadRawHisBars(&buffer, wCode.c_str(), period, [](void* obj, WTSBarStruct* bars, uint32_t count) {
				std::string* buff = (std::string*)obj;
				memcpy((void*)buff->c_str(), bars, sizeof(WTSBarStruct)*count);


		if (!bLoaded)
			std::stringstream ss;
			ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << curCode << ".dsb";
			std::string filename = ss.str();
			if (!StdFile::exists(filename.c_str()))

			std::string content;
			StdFile::read_file_content(filename.c_str(), content);
			if (content.size() < sizeof(HisKlineBlock))
				WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());
				return false;

			HisKlineBlock* kBlock = (HisKlineBlock*)content.c_str();
			WTSBarStruct* firstBar = NULL;
			uint32_t barcnt = 0;
			if (kBlock->_version == BLOCK_VERSION_CMP)
				if (content.size() < sizeof(HisKlineBlockV2))
					WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());

				HisKlineBlockV2* kBlockV2 = (HisKlineBlockV2*)content.c_str();
				if (kBlockV2->_size == 0)

				buffer = WTSCmpHelper::uncompress_data(kBlockV2->_data, (uint32_t)kBlockV2->_size);
				content.erase(0, sizeof(HisKlineBlock));

		uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
		if (barcnt <= 0)

		WTSBarStruct* firstBar = (WTSBarStruct*)buffer.data();

		WTSBarStruct* pBar = std::lower_bound(firstBar, firstBar + (barcnt - 1), sBar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
			if (period == KP_DAY)
				return a.date < b.date;
				return a.time < b.time;

		uint32_t sIdx = pBar - firstBar;
		if ((period == KP_DAY && pBar->date < sBar.date) || (period != KP_DAY && pBar->time < sBar.time))	//早于边界时间
			//早于边界时间, 说明没有数据了, 因为lower_bound会返回大于等于目标位置的数据

		pBar = std::lower_bound(firstBar + sIdx, firstBar + (barcnt - 1), eBar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
			if (period == KP_DAY)
				return a.date < b.date;
				return a.time < b.time;
		uint32_t eIdx = pBar - firstBar;
		if ((period == KP_DAY && pBar->date > eBar.date) || (period != KP_DAY && pBar->time > eBar.time))

		if (eIdx < sIdx)

		uint32_t curCnt = eIdx - sIdx + 1;
		std::vector<WTSBarStruct>* tempAy = new std::vector<WTSBarStruct>();
		memcpy(tempAy->data(), &firstBar[sIdx], sizeof(WTSBarStruct)*curCnt);
		realCnt += curCnt;


		if (bAllCovered)

	if (hotAy)
		realCnt += hotAy->size();

	if (realCnt > 0)

		uint32_t curIdx = 0;
		for (auto it = barsSections.rbegin(); it != barsSections.rend(); it++)
			std::vector<WTSBarStruct>* tempAy = *it;
			memcpy(barList._bars.data() + curIdx, tempAy->data(), tempAy->size() * sizeof(WTSBarStruct));
			curIdx += tempAy->size();
			delete tempAy;

	WTSLogger::info("%u items of back %s data of %s cached", realCnt, pname.c_str(), stdCode);

	return true;

const HisDataReplayer::AdjFactorList& HisDataReplayer::getAdjFactors(const char* code, const char* exchg, const char* pid /* = "" */)
	char key[20] = { 0 };
	sprintf(key, "%s.%s.%s", exchg, pid, code);

	auto it = _adj_factors.find(key);
	if (it == _adj_factors.end())
		//By Wesley @ 2021.12.21
		if (_bt_loader)
            WTSLogger::info("No adjusting factors of %s cached, searching via extented loader...", key);
			_bt_loader->loadAdjFactors(this, key, [](void* obj, const char* stdCode, uint32_t* dates, double* factors, uint32_t count) {
				HisDataReplayer* self = (HisDataReplayer*)obj;
				AdjFactorList& fctrLst = self->_adj_factors[stdCode];

				for (uint32_t i = 0; i < count; i++)
					AdjFactor adjFact;
					adjFact._date = dates[i];
					adjFact._factor = factors[i];


				AdjFactor adjFact;
				adjFact._date = 19900101;
				adjFact._factor = 1;

				std::sort(fctrLst.begin(), fctrLst.end(), [](const AdjFactor& left, const AdjFactor& right) {
					return left._date < right._date;

                WTSLogger::info("%u items of adjusting factors of %s loaded via extended loader", count, stdCode);

	return _adj_factors[key];

bool HisDataReplayer::cacheAdjustedStkBars(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed /* = true */)
	CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
	std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);

	uint32_t curDate = TimeUtils::getCurDate();
	uint32_t curTime = TimeUtils::getCurMin() / 100;

	uint32_t endTDate = _bd_mgr.calcTradingDate(stdPID.c_str(), curDate, curTime, false);

	std::string pname;
	switch (period)
	case KP_Minute1: pname = "min1"; break;
	case KP_Minute5: pname = "min5"; break;
	default: pname = "day"; break;

	BarsList& barList = _bars_cache[key];
	barList._code = stdCode;
	barList._period = period;

	std::vector<std::vector<WTSBarStruct>*> barsSections;

	uint32_t realCnt = 0;

	std::vector<WTSBarStruct>* adjustedBars = NULL;
	uint32_t lastQTime = 0;

	WTSLogger::info("Loading adjusted bars of %s...", stdCode);
		// 先直接读取复权过的历史数据,路径如/his/day/sse/SH600000Q.dsb

		 *	By Wesley @ 2021.12.20
		 *	本来这里是要先调用_loader->loadRawHisBars从外部加载器读取主力合约数据的
		 *	但是上层会调用一次loadFinalHisBars,这里再调用loadRawHisBars就冗余了,所以直接跳过
		std::stringstream ss;
		ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << cInfo._code << (cInfo._exright == 1 ? "Q" : "H") << ".dsb";
		std::string filename = ss.str();
		if (!StdFile::exists(filename.c_str()))

		std::string content;
		StdFile::read_file_content(filename.c_str(), content);
		if (content.size() < sizeof(HisKlineBlock))
			WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());

		HisKlineBlock* kBlock = (HisKlineBlock*)content.c_str();
		std::string buffer;
		if (kBlock->_version == BLOCK_VERSION_CMP)
			if (content.size() < sizeof(HisKlineBlockV2))
				WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());

			HisKlineBlockV2* kBlockV2 = (HisKlineBlockV2*)content.c_str();
			if (kBlockV2->_size == 0)

			buffer = WTSCmpHelper::uncompress_data(kBlockV2->_data, (uint32_t)kBlockV2->_size);
			content.erase(0, sizeof(HisKlineBlock));

		uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
		if (barcnt <= 0)

		adjustedBars = new std::vector<WTSBarStruct>();
		memcpy(adjustedBars->data(), buffer.data(), buffer.size());

		if (period != KP_DAY)
			lastQTime = adjustedBars->at(barcnt - 1).time;
			lastQTime = adjustedBars->at(barcnt - 1).date;

		WTSLogger::info("%u items of adjusted back %s data of stock %s directly loaded", barcnt, pname.c_str(), stdCode);
	} while (false);

	WTSLogger::info("Loading raw bars of %s...", stdCode);
		const char* curCode = cInfo._code;

		WTSBarStruct sBar;
		if (period != KP_DAY)
			sBar.date = TimeUtils::minBarToDate(lastQTime);

			sBar.time = lastQTime + 1;
			sBar.date = lastQTime + 1;

		 *	By Wesley @ 2021.12.20
		 *	先从extloader读取未复权K线数据
		 *	如果没有读到,再从文件读取
		bool bLoaded = false;
		std::string buffer;
		if (NULL != _bt_loader)
			std::string wCode = StrUtil::printf("%s.%s.%s", cInfo._exchg, cInfo._product, curCode);
			bLoaded = _bt_loader->loadRawHisBars(&buffer, wCode.c_str(), period, [](void* obj, WTSBarStruct* bars, uint32_t count) {
				std::string* buff = (std::string*)obj;
				memcpy((void*)buff->c_str(), bars, sizeof(WTSBarStruct)*count);

			if (bLoaded)
				WTSLogger::debug("Raw bars of %s loaded via extended loader", stdCode);

		if (!bLoaded)
			std::stringstream ss;
			ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << curCode << ".dsb";
			std::string filename = ss.str();
			if (!StdFile::exists(filename.c_str()))

			std::string content;
			StdFile::read_file_content(filename.c_str(), content);
			if (content.size() < sizeof(HisKlineBlock))
				WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());
				return false;

			HisKlineBlock* kBlock = (HisKlineBlock*)content.c_str();
			WTSBarStruct* firstBar = NULL;
			uint32_t barcnt = 0;
			if (kBlock->_version == BLOCK_VERSION_CMP)
				if (content.size() < sizeof(HisKlineBlockV2))
					WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());

				HisKlineBlockV2* kBlockV2 = (HisKlineBlockV2*)content.c_str();
				if (kBlockV2->_size == 0)

				buffer = WTSCmpHelper::uncompress_data(kBlockV2->_data, (uint32_t)kBlockV2->_size);
				content.erase(0, sizeof(HisKlineBlock));

			WTSLogger::debug("Raw bars of %s loaded from dsb file", stdCode);

		uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
		if (barcnt <= 0)

		WTSBarStruct* firstBar = (WTSBarStruct*)buffer.data();

		WTSBarStruct* pBar = std::lower_bound(firstBar, firstBar + (barcnt - 1), sBar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
			if (period == KP_DAY)
				return a.date < b.date;
				return a.time < b.time;

		if (pBar != NULL)
			uint32_t sIdx = pBar - firstBar;
			uint32_t curCnt = barcnt - sIdx;
			std::vector<WTSBarStruct>* tempAy = new std::vector<WTSBarStruct>();
			memcpy(tempAy->data(), &firstBar[sIdx], sizeof(WTSBarStruct)*curCnt);
			realCnt += curCnt;

			auto& ayFactors = getAdjFactors(cInfo._code, cInfo._exchg, cInfo._product);
			if (!ayFactors.empty())
				WTSLogger::info("Adjusting bars of %s with adjusting factors...", stdCode);
				int32_t lastIdx = curCnt;
				WTSBarStruct bar;
				firstBar = tempAy->data();

				double baseFactor = 1.0;
				if (cInfo._exright == 1)
					baseFactor = ayFactors.back()._factor;
				else if (cInfo._exright == 2)
					barList._factor = ayFactors.back()._factor;

				for (auto it = ayFactors.rbegin(); it != ayFactors.rend(); it++)
					const AdjFactor& adjFact = *it;
					bar.date = adjFact._date;

					double factor = adjFact._factor / baseFactor;

					WTSBarStruct* pBar = NULL;
					pBar = std::lower_bound(firstBar, firstBar + lastIdx - 1, bar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
						return a.date < b.date;

					if (pBar->date < bar.date)

					WTSBarStruct* endBar = pBar;
					if (pBar != NULL)
						int32_t curIdx = pBar - firstBar;
						while (pBar && curIdx < lastIdx)
							pBar->open *= factor;
							pBar->high *= factor;
							pBar->low *= factor;
							pBar->close *= factor;

						lastIdx = endBar - firstBar;

					if (lastIdx == 0)
				WTSLogger::info("No adjusting factors of %s found, ajusting task skipped...", stdCode);

	} while (false);

	if (adjustedBars)
		realCnt += adjustedBars->size();

	if (realCnt > 0)

		uint32_t curIdx = 0;
		for (auto it = barsSections.rbegin(); it != barsSections.rend(); it++)
			std::vector<WTSBarStruct>* tempAy = *it;
			memcpy(barList._bars.data() + curIdx, tempAy->data(), tempAy->size() * sizeof(WTSBarStruct));
			curIdx += tempAy->size();
			delete tempAy;

	WTSLogger::info("%u items of back %s data of %s cached", realCnt, pname.c_str(), stdCode);

	return true;

bool HisDataReplayer::cacheRawBarsFromBin(const std::string& key, const char* stdCode, WTSKlinePeriod period, bool bSubbed/* = true*/)
	CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
	std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);

	uint32_t curDate = TimeUtils::getCurDate();
	uint32_t curTime = TimeUtils::getCurMin() / 100;

	uint32_t endTDate = _bd_mgr.calcTradingDate(stdPID.c_str(), curDate, curTime, false);

	std::string pname;
	switch (period)
	case KP_Minute1: pname = "min1"; break;
	case KP_Minute5: pname = "min5"; break;
	default: pname = "day"; break;

	bool isDay = (period == KP_DAY);

	BarsList& barList = bSubbed ? _bars_cache[key] : _unbars_cache[key];
	barList._code = stdCode;
	barList._period = period;

	std::vector<std::vector<WTSBarStruct>*> barsSections;

	uint32_t realCnt = 0;
	if (!cInfo.isFlat() && cInfo.isFuture())		// 如果是读取期货主力连续数据
		return cacheIntegratedFutBars(key, stdCode, period);
	else if (cInfo.isExright() && cInfo.isStock())	// 如果是读取股票复权数据
		return cacheAdjustedStkBars(key, stdCode, period, bSubbed);
	 *	By Wesley @ 2021.12.20
	 *	先从extloader读取
	 *	如果没有读到,再从文件读取
	bool bLoaded = false;
	std::string buffer;
	if (NULL != _bt_loader)
		bLoaded = _bt_loader->loadRawHisBars(&buffer, stdCode, period, [](void* obj, WTSBarStruct* bars, uint32_t count) {
			std::string* buff = (std::string*)obj;
			memcpy((void*)buff->c_str(), bars, sizeof(WTSBarStruct)*count);
		std::stringstream ss;
		ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << cInfo._code << ".dsb";
		std::string filename = ss.str();
		if (StdFile::exists(filename.c_str()))
			//如果有格式化的历史数据文件, 则直接读取
			std::string content;
			StdFile::read_file_content(filename.c_str(), content);
			if (content.size() < sizeof(HisKlineBlock))
				WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());
				return false;

			HisKlineBlock* kBlock = (HisKlineBlock*)content.c_str();
			WTSBarStruct* firstBar = NULL;
			if (kBlock->_version == BLOCK_VERSION_CMP)
				if (content.size() < sizeof(HisKlineBlockV2))
					WTSLogger::error("Sizechecking of back kbar data file %s failed", filename.c_str());
					return false;

				HisKlineBlockV2* kBlockV2 = (HisKlineBlockV2*)content.c_str();
				if (kBlockV2->_size == 0)
					return false;

				buffer = WTSCmpHelper::uncompress_data(kBlockV2->_data, (uint32_t)kBlockV2->_size);
				content.erase(0, sizeof(HisKlineBlock));

	uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
	if (barcnt <= 0)
		return false;

	WTSBarStruct* firstBar = (WTSBarStruct*)buffer.data();
	if (barcnt > 0)

		uint32_t sIdx = 0;
		uint32_t idx = barcnt - 1;
		uint32_t curCnt = (idx - sIdx + 1);

		std::vector<WTSBarStruct>* tempAy = new std::vector<WTSBarStruct>();
		memcpy(tempAy->data(), &firstBar[sIdx], sizeof(WTSBarStruct)*curCnt);
		realCnt += curCnt;


	if (realCnt > 0)

		uint32_t curIdx = 0;
		for (auto it = barsSections.rbegin(); it != barsSections.rend(); it++)
			std::vector<WTSBarStruct>* tempAy = *it;
			memcpy(barList._bars.data() + curIdx, tempAy->data(), tempAy->size()*sizeof(WTSBarStruct));
			curIdx += tempAy->size();
			delete tempAy;
		barList._count = barList._bars.size();

	WTSLogger::info("%u items of back %s data of %s cached", realCnt, pname.c_str(), stdCode);
	return true;