WtDataReader.h
source: wtcpp/folder98/folder08/folder1/file03.md
#pragma once
#include <string>
#include <stdint.h>
#include "DataDefine.h"
#include "../Includes/FasterDefs.h"
#include "../Includes/IDataReader.h"
#include "../Share/BoostMappingFile.hpp"
NS_WTP_BEGIN
typedef std::shared_ptr<BoostMappingFile> BoostMFPtr;
class WtDataReader : public IDataReader
{
public:
WtDataReader();
virtual ~WtDataReader();
private:
typedef struct _RTKBlockPair
{
RTKlineBlock* _block;
BoostMFPtr _file;
uint64_t _last_cap;
_RTKBlockPair()
{
_block = NULL;
_file = NULL;
_last_cap = 0;
}
} RTKlineBlockPair;
typedef faster_hashmap<std::string, RTKlineBlockPair> RTKBlockFilesMap; // K线数据块映射
typedef struct _TBlockPair
{
RTTickBlock* _block;
BoostMFPtr _file;
uint64_t _last_cap;
_TBlockPair()
{
_block = NULL;
_file = NULL;
_last_cap = 0;
}
} TickBlockPair;
typedef faster_hashmap<std::string, TickBlockPair> TBlockFilesMap; // tick数据块映射
typedef struct _TransBlockPair
{
RTTransBlock* _block;
BoostMFPtr _file;
uint64_t _last_cap;
std::shared_ptr< std::ofstream> _fstream;
_TransBlockPair()
{
_block = NULL;
_file = NULL;
_last_cap = 0;
}
} TransBlockPair;
typedef faster_hashmap<std::string, TransBlockPair> TransBlockFilesMap; // 交易数据块映射
typedef struct _OdrDtlBlockPair
{
RTOrdDtlBlock* _block;
BoostMFPtr _file;
uint64_t _last_cap;
std::shared_ptr< std::ofstream> _fstream;
_OdrDtlBlockPair()
{
_block = NULL;
_file = NULL;
_last_cap = 0;
}
} OrdDtlBlockPair;
typedef faster_hashmap<std::string, OrdDtlBlockPair> OrdDtlBlockFilesMap; // 订单数据块映射
typedef struct _OdrQueBlockPair
{
RTOrdQueBlock* _block;
BoostMFPtr _file;
uint64_t _last_cap;
std::shared_ptr< std::ofstream> _fstream;
_OdrQueBlockPair()
{
_block = NULL;
_file = NULL;
_last_cap = 0;
}
} OrdQueBlockPair;
typedef faster_hashmap<std::string, OrdQueBlockPair> OrdQueBlockFilesMap; // 订单队列数据块映射
RTKBlockFilesMap _rt_min1_map;
RTKBlockFilesMap _rt_min5_map;
TBlockFilesMap _rt_tick_map;
TransBlockFilesMap _rt_trans_map;
OrdDtlBlockFilesMap _rt_orddtl_map;
OrdQueBlockFilesMap _rt_ordque_map;
typedef struct _HisTBlockPair
{
HisTickBlock* _block;
uint64_t _date;
std::string _buffer;
_HisTBlockPair()
{
_block = NULL;
_date = 0;
_buffer.clear();
}
} HisTBlockPair;
typedef faster_hashmap<std::string, HisTBlockPair> HisTickBlockMap; // 历史tick数据块映射
typedef struct _HisTransBlockPair
{
HisTransBlock* _block;
uint64_t _date;
std::string _buffer;
_HisTransBlockPair()
{
_block = NULL;
_date = 0;
_buffer.clear();
}
} HisTransBlockPair;
typedef faster_hashmap<std::string, HisTransBlockPair> HisTransBlockMap; // 历史交易数据块映射
typedef struct _HisOrdDtlBlockPair
{
HisOrdDtlBlock* _block;
uint64_t _date;
std::string _buffer;
_HisOrdDtlBlockPair()
{
_block = NULL;
_date = 0;
_buffer.clear();
}
} HisOrdDtlBlockPair;
typedef faster_hashmap<std::string, HisOrdDtlBlockPair> HisOrdDtlBlockMap; // 历史订单数据块映射
typedef struct _HisOrdQueBlockPair
{
HisOrdQueBlock* _block;
uint64_t _date;
std::string _buffer;
_HisOrdQueBlockPair()
{
_block = NULL;
_date = 0;
_buffer.clear();
}
} HisOrdQueBlockPair;
typedef faster_hashmap<std::string, HisOrdQueBlockPair> HisOrdQueBlockMap; // 历史订单队列数据块映射
HisTickBlockMap _his_tick_map;
HisOrdDtlBlockMap _his_orddtl_map;
HisOrdQueBlockMap _his_ordque_map;
HisTransBlockMap _his_trans_map;
private:
// 从本地 .dmb 文件中获取映射的数据块
RTKlineBlockPair* getRTKilneBlock(const char* exchg, const char* code, WTSKlinePeriod period);
TickBlockPair* getRTTickBlock(const char* exchg, const char* code);
OrdQueBlockPair* getRTOrdQueBlock(const char* exchg, const char* code);
OrdDtlBlockPair* getRTOrdDtlBlock(const char* exchg, const char* code);
TransBlockPair* getRTTransBlock(const char* exchg, const char* code);
bool cacheIntegratedFutBars(const std::string& key, const char* stdCode, WTSKlinePeriod period);
bool cacheAdjustedStkBars(const std::string& key, const char* stdCode, WTSKlinePeriod period);
/*
* 将历史数据放入缓存
*/
bool cacheHisBarsFromFile(const std::string& key, const char* stdCode, WTSKlinePeriod period);
bool cacheFinalBarsFromLoader(const std::string& key, const char* stdCode, WTSKlinePeriod period);
bool loadStkAdjFactorsFromFile(const char* adjfile);
bool loadStkAdjFactorsFromLoader();
public:
virtual void init(WTSVariant* cfg, IDataReaderSink* sink, IHisDataLoader* loader = NULL) override;
// K线闭合
virtual void onMinuteEnd(uint32_t uDate, uint32_t uTime, uint32_t endTDate = 0) override;
// 获取数据切片
virtual WTSTickSlice* readTickSlice(const char* stdCode, uint32_t count, uint64_t etime = 0) override;
virtual WTSOrdDtlSlice* readOrdDtlSlice(const char* stdCode, uint32_t count, uint64_t etime = 0) override;
virtual WTSOrdQueSlice* readOrdQueSlice(const char* stdCode, uint32_t count, uint64_t etime = 0) override;
virtual WTSTransSlice* readTransSlice(const char* stdCode, uint32_t count, uint64_t etime = 0) override;
virtual WTSKlineSlice* readKlineSlice(const char* stdCode, WTSKlinePeriod period, uint32_t count, uint64_t etime = 0) override;
virtual double getAdjFactorByDate(const char* stdCode, uint32_t date = 0) override;
private:
std::string _base_dir; // 数据保存路径
IBaseDataMgr* _base_data_mgr; // 基础数据管理器
IHotMgr* _hot_mgr; // 主力合约管理器
typedef struct _BarsList
{
std::string _exchg;
std::string _code;
WTSKlinePeriod _period;
uint32_t _rt_cursor;
std::string _raw_code;
std::vector<WTSBarStruct> _bars;
double _factor;
_BarsList() :_rt_cursor(UINT_MAX), _factor(DBL_MAX){}
} BarsList;
typedef faster_hashmap<std::string, BarsList> BarsCache;
BarsCache _bars_cache; // bar数据缓存
uint64_t _last_time;
//除权因子
typedef struct _AdjFactor
{
uint32_t _date;
double _factor;
} AdjFactor;
typedef std::vector<AdjFactor> AdjFactorList;
typedef faster_hashmap<std::string, AdjFactorList> AdjFactorMap;
AdjFactorMap _adj_factors;
const AdjFactorList& getAdjFactors(const char* code, const char* exchg, const char* pid);
};
NS_WTP_END
WtDataReader.cpp
#include "WtDataReader.h"
#include "../Includes/WTSVariant.hpp"
#include "../Share/TimeUtils.hpp"
#include "../Share/CodeHelper.hpp"
#include "../Share/StdUtils.hpp"
#include "../Includes/WTSContractInfo.hpp"
#include "../Includes/IBaseDataMgr.h"
#include "../Includes/IHotMgr.h"
#include "../Includes/WTSDataDef.hpp"
#include "../WTSUtils/WTSCmpHelper.hpp"
#include "../WTSUtils/WTSCfgLoader.h"
#include <rapidjson/document.h>
namespace rj = rapidjson;
//By Wesley @ 2022.01.05
#include "../Share/fmtlib.h"
template<typename... Args>
inline void pipe_reader_log(IDataReaderSink* sink, WTSLogLevel ll, const char* format, const Args&... args)
{
if (sink == NULL)
return;
static thread_local char buffer[512] = { 0 };
memset(buffer, 0, 512);
fmt::format_to(buffer, format, args...);
sink->reader_log(ll, buffer);
}
extern "C"
{
EXPORT_FLAG IDataReader* createDataReader()
{
IDataReader* ret = new WtDataReader();
return ret;
}
EXPORT_FLAG void deleteDataReader(IDataReader* reader)
{
if (reader != NULL)
delete reader;
}
};
/*
* 处理块数据
*/
bool proc_block_data(std::string& content, bool isBar, bool bKeepHead /* = true */)
{
BlockHeader* header = (BlockHeader*)content.data();
bool bCmped = header->is_compressed();
bool bOldVer = header->is_old_version();
//如果既没有压缩,也不是老版本结构体,则直接返回
if (!bCmped && !bOldVer)
{
if (!bKeepHead)
content.erase(0, BLOCK_HEADER_SIZE);
return true;
}
std::string buffer;
if (bCmped)
{
BlockHeaderV2* blkV2 = (BlockHeaderV2*)content.c_str();
if (content.size() != (sizeof(BlockHeaderV2) + blkV2->_size))
{
return false;
}
//将文件头后面的数据进行解压
buffer = WTSCmpHelper::uncompress_data(content.data() + BLOCK_HEADERV2_SIZE, (uint32_t)blkV2->_size);
}
else
{
if (!bOldVer)
{
//如果不是老版本,直接返回
if (!bKeepHead)
content.erase(0, BLOCK_HEADER_SIZE);
return true;
}
else
{
buffer.append(content.data() + BLOCK_HEADER_SIZE, content.size() - BLOCK_HEADER_SIZE);
}
}
if (bOldVer)
{
if (isBar)
{
std::string bufV2;
uint32_t barcnt = buffer.size() / sizeof(WTSBarStructOld);
bufV2.resize(barcnt * sizeof(WTSBarStruct));
WTSBarStruct* newBar = (WTSBarStruct*)bufV2.data();
WTSBarStructOld* oldBar = (WTSBarStructOld*)buffer.data();
for (uint32_t idx = 0; idx < barcnt; idx++)
{
newBar[idx] = oldBar[idx];
}
buffer.swap(bufV2);
}
else
{
uint32_t tick_cnt = buffer.size() / sizeof(WTSTickStructOld);
std::string bufv2;
bufv2.resize(sizeof(WTSTickStruct)*tick_cnt);
WTSTickStruct* newTick = (WTSTickStruct*)bufv2.data();
WTSTickStructOld* oldTick = (WTSTickStructOld*)buffer.data();
for (uint32_t i = 0; i < tick_cnt; i++)
{
newTick[i] = oldTick[i];
}
buffer.swap(bufv2);
}
}
if (bKeepHead)
{
content.resize(BLOCK_HEADER_SIZE);
content.append(buffer);
header = (BlockHeader*)content.data();
header->_version = BLOCK_VERSION_RAW_V2;
}
else
{
content.swap(buffer);
}
return true;
}
WtDataReader::WtDataReader()
: _last_time(0)
, _base_data_mgr(NULL)
, _hot_mgr(NULL)
{
}
WtDataReader::~WtDataReader()
{
}
void WtDataReader::init(WTSVariant* cfg, IDataReaderSink* sink, IHisDataLoader* loader /* = NULL */)
{
// 初始化数据读取接口
IDataReader::init(cfg, sink, loader);
// 基础数据管理器/主力合约管理器
_base_data_mgr = sink->get_basedata_mgr();
_hot_mgr = sink->get_hot_mgr();
if (cfg == NULL)
return ;
// 获取文件存储路径
_base_dir = cfg->getCString("path");
_base_dir = StrUtil::standardisePath(_base_dir);
pipe_reader_log(sink, LL_DEBUG, "Storage initialized @ {}", _base_dir);
/*
* By Wesley @ 2021.12.20
* 先从extloader加载除权因子
* 如果加载失败,并且配置了除权因子文件,再加载除权因子文件
*/
bool bLoaded = loadStkAdjFactorsFromLoader();
if (!bLoaded && cfg->has("adjfactor"))
loadStkAdjFactorsFromFile(cfg->getCString("adjfactor"));
else
pipe_reader_log(sink, LL_INFO, "No adjusting factor file configured, loading skipped");
}
// 加载除权因子
bool WtDataReader::loadStkAdjFactorsFromLoader()
{
if (NULL == _loader)
return false;
bool ret = _loader->loadAllAdjFactors(&_adj_factors, [](void* obj, const char* stdCode, uint32_t* dates, double* factors, uint32_t count) {
AdjFactorMap* fact_map = (AdjFactorMap*)obj;
AdjFactorList& fctrLst = (*fact_map)[stdCode];
for(uint32_t i = 0; i < count; i++)
{
AdjFactor adjFact;
adjFact._date = dates[i];
adjFact._factor = factors[i];
fctrLst.emplace_back(adjFact);
}
//一定要把第一条加进去,不然如果是前复权的话,可能会漏处理最早的数据
AdjFactor adjFact;
adjFact._date = 19900101;
adjFact._factor = 1;
fctrLst.emplace_back(adjFact);
std::sort(fctrLst.begin(), fctrLst.end(), [](const AdjFactor& left, const AdjFactor& right) {
return left._date < right._date;
});
});
if (ret && _sink) pipe_reader_log(_sink,LL_INFO, "Adjusting factors of {} contracts loaded via extended loader", _adj_factors.size());
return ret;
}
bool WtDataReader::loadStkAdjFactorsFromFile(const char* adjfile)
{
if(!StdFile::exists(adjfile))
{
pipe_reader_log(_sink,LL_ERROR, "Adjusting factors file {} not exists", adjfile);
return false;
}
WTSVariant* doc = WTSCfgLoader::load_from_file(adjfile, true);
if(doc == NULL)
{
pipe_reader_log(_sink, LL_ERROR, "Loading adjusting factors file {} failed", adjfile);
return false;
}
uint32_t stk_cnt = 0;
uint32_t fct_cnt = 0;
for (const std::string& exchg : doc->memberNames())
{
WTSVariant* itemExchg = doc->get(exchg);
for(const std::string& code : itemExchg->memberNames())
{
WTSVariant* ayFacts = itemExchg->get(code);
if(!ayFacts->isArray() )
continue;
/*
* By Wesley @ 2021.12.21
* 先检查code的格式是不是包含PID,如STK.600000
* 如果包含PID,则直接格式化,如果不包含,则强制为STK
*/
bool bHasPID = (code.find('.') != std::string::npos);
std::string key;
if (bHasPID)
key = StrUtil::printf("%s.%s", exchg, code);
else
key = StrUtil::printf("%s.STK.s", exchg, code);
stk_cnt++;
AdjFactorList& fctrLst = _adj_factors[key];
for (uint32_t i = 0; i < ayFacts->size(); i++)
{
WTSVariant* fItem = ayFacts->get(i);
AdjFactor adjFact;
adjFact._date = fItem->getUInt32("date");
adjFact._factor = fItem->getDouble("factor");
fctrLst.emplace_back(adjFact);
fct_cnt++;
}
//一定要把第一条加进去,不然如果是前复权的话,可能会漏处理最早的数据
AdjFactor adjFact;
adjFact._date = 19900101;
adjFact._factor = 1;
fctrLst.emplace_back(adjFact);
std::sort(fctrLst.begin(), fctrLst.end(), [](const AdjFactor& left, const AdjFactor& right) {
return left._date < right._date;
});
}
}
pipe_reader_log(_sink,LL_INFO, "{} adjusting factors of {} tickers loaded", fct_cnt, stk_cnt);
doc->release();
return true;
}
WTSTickSlice* WtDataReader::readTickSlice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate, curTime, curSecs;
if (etime == 0)
{
curDate = _sink->get_date();
curTime = _sink->get_min_time();
curSecs = _sink->get_secs();
etime = (uint64_t)curDate * 1000000000 + curTime * 100000 + curSecs;
}
else
{
//20190807124533900
curDate = (uint32_t)(etime / 1000000000);
curTime = (uint32_t)(etime % 1000000000) / 100000;
curSecs = (uint32_t)(etime % 100000);
}
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
uint32_t curTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), 0, 0, false);
bool isToday = (endTDate == curTDate);
std::string curCode = cInfo._code;
if (cInfo.isHot() && commInfo->isFuture())
curCode = _hot_mgr->getRawCode(cInfo._exchg, cInfo._product, endTDate);
else if (cInfo.isSecond() && commInfo->isFuture())
curCode = _hot_mgr->getSecondRawCode(cInfo._exchg, cInfo._product, endTDate);
//比较时间的对象
WTSTickStruct eTick;
eTick.action_date = curDate;
eTick.action_time = curTime * 100000 + curSecs;
if (isToday)
{
TickBlockPair* tPair = getRTTickBlock(cInfo._exchg, curCode.c_str());
if (tPair == NULL)
return NULL;
RTTickBlock* tBlock = tPair->_block;
// std::lower_bound() 是在区间内找到第一个大于等于 value 的值的位置并返回,如果没找到就返回 end() 位置
WTSTickStruct* pTick = std::lower_bound(tBlock->_ticks, tBlock->_ticks + (tBlock->_size - 1), eTick, [](const WTSTickStruct& a, const WTSTickStruct& b){
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pTick - tBlock->_ticks;
//如果光标定位的tick时间比目标时间打, 则全部回退一个
if (pTick->action_date > eTick.action_date || pTick->action_time>eTick.action_time)
{
pTick--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSTickSlice* slice = WTSTickSlice::create(stdCode, tBlock->_ticks + sIdx, cnt);
return slice;
}
else
{
std::string key = StrUtil::printf("%s-%d", stdCode, endTDate);
auto it = _his_tick_map.find(key);
if(it == _his_tick_map.end())
{
std::stringstream ss;
ss << _base_dir << "his/ticks/" << cInfo._exchg << "/" << endTDate << "/" << curCode << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
return NULL;
HisTBlockPair& tBlkPair = _his_tick_map[key];
StdFile::read_file_content(filename.c_str(), tBlkPair._buffer);
if (tBlkPair._buffer.size() < sizeof(HisTickBlock))
{
pipe_reader_log(_sink,LL_ERROR, "Sizechecking of his tick data file {} failed", filename);
tBlkPair._buffer.clear();
return NULL;
}
proc_block_data(tBlkPair._buffer, false, true);
tBlkPair._block = (HisTickBlock*)tBlkPair._buffer.c_str();
}
HisTBlockPair& tBlkPair = _his_tick_map[key];
if (tBlkPair._block == NULL)
return NULL;
HisTickBlock* tBlock = tBlkPair._block;
uint32_t tcnt = (tBlkPair._buffer.size() - sizeof(HisTickBlock)) / sizeof(WTSTickStruct);
if (tcnt <= 0)
return NULL;
WTSTickStruct* pTick = std::lower_bound(tBlock->_ticks, tBlock->_ticks + (tcnt - 1), eTick, [](const WTSTickStruct& a, const WTSTickStruct& b){
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pTick - tBlock->_ticks;
if (pTick->action_date > eTick.action_date || pTick->action_time >= eTick.action_time)
{
pTick--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSTickSlice* slice = WTSTickSlice::create(stdCode, tBlock->_ticks + sIdx, cnt);
return slice;
}
}
WTSOrdQueSlice* WtDataReader::readOrdQueSlice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate, curTime, curSecs;
if (etime == 0)
{
curDate = _sink->get_date();
curTime = _sink->get_min_time();
curSecs = _sink->get_secs();
etime = (uint64_t)curDate * 1000000000 + curTime * 100000 + curSecs;
}
else
{
//20190807124533900
curDate = (uint32_t)(etime / 1000000000);
curTime = (uint32_t)(etime % 1000000000) / 100000;
curSecs = (uint32_t)(etime % 100000);
}
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
uint32_t curTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), 0, 0, false);
bool isToday = (endTDate == curTDate);
std::string curCode = cInfo._code;
if (cInfo.isHot() && commInfo->isFuture())
curCode = _hot_mgr->getRawCode(cInfo._exchg, cInfo._product, endTDate);
else if (cInfo.isSecond() && commInfo->isFuture())
curCode = _hot_mgr->getSecondRawCode(cInfo._exchg, cInfo._product, endTDate);
//比较时间的对象
WTSOrdQueStruct eTick;
eTick.action_date = curDate;
eTick.action_time = curTime * 100000 + curSecs;
if (isToday)
{
OrdQueBlockPair* tPair = getRTOrdQueBlock(cInfo._exchg, curCode.c_str());
if (tPair == NULL)
return NULL;
RTOrdQueBlock* rtBlock = tPair->_block;
WTSOrdQueStruct* pItem = std::lower_bound(rtBlock->_queues, rtBlock->_queues + (rtBlock->_size - 1), eTick, [](const WTSOrdQueStruct& a, const WTSOrdQueStruct& b) {
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pItem - rtBlock->_queues;
//如果光标定位的tick时间比目标时间打, 则全部回退一个
if (pItem->action_date > eTick.action_date || pItem->action_time > eTick.action_time)
{
pItem--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSOrdQueSlice* slice = WTSOrdQueSlice::create(stdCode, rtBlock->_queues + sIdx, cnt);
return slice;
}
else
{
std::string key = StrUtil::printf("%s-%d", stdCode, endTDate);
auto it = _his_ordque_map.find(key);
if (it == _his_ordque_map.end())
{
std::stringstream ss;
ss << _base_dir << "his/queue/" << cInfo._exchg << "/" << endTDate << "/" << curCode << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
return NULL;
HisOrdQueBlockPair& hisBlkPair = _his_ordque_map[key];
StdFile::read_file_content(filename.c_str(), hisBlkPair._buffer);
if (hisBlkPair._buffer.size() < sizeof(HisOrdQueBlockV2))
{
pipe_reader_log(_sink,LL_ERROR, "历史委托队列数据文件{}大小校验失败", filename);
hisBlkPair._buffer.clear();
return NULL;
}
HisOrdQueBlockV2* tBlockV2 = (HisOrdQueBlockV2*)hisBlkPair._buffer.c_str();
if (hisBlkPair._buffer.size() != (sizeof(HisOrdQueBlockV2) + tBlockV2->_size))
{
pipe_reader_log(_sink,LL_ERROR, "历史委托队列数据文件{}大小校验失败", filename);
return NULL;
}
//需要解压
std::string buf = WTSCmpHelper::uncompress_data(tBlockV2->_data, (uint32_t)tBlockV2->_size);
//将原来的buffer只保留一个头部,并将所有tick数据追加到尾部
hisBlkPair._buffer.resize(sizeof(HisOrdQueBlock));
hisBlkPair._buffer.append(buf);
tBlockV2->_version = BLOCK_VERSION_RAW_V2;
hisBlkPair._block = (HisOrdQueBlock*)hisBlkPair._buffer.c_str();
}
HisOrdQueBlockPair& tBlkPair = _his_ordque_map[key];
if (tBlkPair._block == NULL)
return NULL;
HisOrdQueBlock* tBlock = tBlkPair._block;
uint32_t tcnt = (tBlkPair._buffer.size() - sizeof(HisOrdQueBlock)) / sizeof(WTSOrdQueStruct);
if (tcnt <= 0)
return NULL;
WTSOrdQueStruct* pTick = std::lower_bound(tBlock->_items, tBlock->_items + (tcnt - 1), eTick, [](const WTSOrdQueStruct& a, const WTSOrdQueStruct& b) {
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pTick - tBlock->_items;
if (pTick->action_date > eTick.action_date || pTick->action_time >= eTick.action_time)
{
pTick--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSOrdQueSlice* slice = WTSOrdQueSlice::create(stdCode, tBlock->_items + sIdx, cnt);
return slice;
}
}
WTSOrdDtlSlice* WtDataReader::readOrdDtlSlice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate, curTime, curSecs;
if (etime == 0)
{
curDate = _sink->get_date();
curTime = _sink->get_min_time();
curSecs = _sink->get_secs();
etime = (uint64_t)curDate * 1000000000 + curTime * 100000 + curSecs;
}
else
{
//20190807124533900
curDate = (uint32_t)(etime / 1000000000);
curTime = (uint32_t)(etime % 1000000000) / 100000;
curSecs = (uint32_t)(etime % 100000);
}
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
uint32_t curTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), 0, 0, false);
bool isToday = (endTDate == curTDate);
std::string curCode = cInfo._code;
if (cInfo.isHot() && commInfo->isFuture())
curCode = _hot_mgr->getRawCode(cInfo._exchg, cInfo._product, endTDate);
else if (cInfo.isSecond() && commInfo->isFuture())
curCode = _hot_mgr->getSecondRawCode(cInfo._exchg, cInfo._product, endTDate);
//比较时间的对象
WTSOrdDtlStruct eTick;
eTick.action_date = curDate;
eTick.action_time = curTime * 100000 + curSecs;
if (isToday)
{
OrdDtlBlockPair* tPair = getRTOrdDtlBlock(cInfo._exchg, curCode.c_str());
if (tPair == NULL)
return NULL;
RTOrdDtlBlock* rtBlock = tPair->_block;
WTSOrdDtlStruct* pItem = std::lower_bound(rtBlock->_details, rtBlock->_details + (rtBlock->_size - 1), eTick, [](const WTSOrdDtlStruct& a, const WTSOrdDtlStruct& b) {
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pItem - rtBlock->_details;
//如果光标定位的tick时间比目标时间打, 则全部回退一个
if (pItem->action_date > eTick.action_date || pItem->action_time > eTick.action_time)
{
pItem--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSOrdDtlSlice* slice = WTSOrdDtlSlice::create(stdCode, rtBlock->_details + sIdx, cnt);
return slice;
}
else
{
std::string key = StrUtil::printf("%s-%d", stdCode, endTDate);
auto it = _his_ordque_map.find(key);
if (it == _his_ordque_map.end())
{
std::stringstream ss;
ss << _base_dir << "his/orders/" << cInfo._exchg << "/" << endTDate << "/" << curCode << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
return NULL;
HisOrdDtlBlockPair& hisBlkPair = _his_orddtl_map[key];
StdFile::read_file_content(filename.c_str(), hisBlkPair._buffer);
if (hisBlkPair._buffer.size() < sizeof(HisOrdDtlBlockV2))
{
pipe_reader_log(_sink,LL_ERROR, "历史逐笔委托数据文件{}大小校验失败", filename.c_str());
hisBlkPair._buffer.clear();
return NULL;
}
HisOrdDtlBlockV2* tBlockV2 = (HisOrdDtlBlockV2*)hisBlkPair._buffer.c_str();
if (hisBlkPair._buffer.size() != (sizeof(HisOrdDtlBlockV2) + tBlockV2->_size))
{
pipe_reader_log(_sink,LL_ERROR, "历史逐笔委托数据文件{}大小校验失败", filename.c_str());
return NULL;
}
//需要解压
std::string buf = WTSCmpHelper::uncompress_data(tBlockV2->_data, (uint32_t)tBlockV2->_size);
//将原来的buffer只保留一个头部,并将所有tick数据追加到尾部
hisBlkPair._buffer.resize(sizeof(HisOrdDtlBlock));
hisBlkPair._buffer.append(buf);
tBlockV2->_version = BLOCK_VERSION_RAW_V2;
hisBlkPair._block = (HisOrdDtlBlock*)hisBlkPair._buffer.c_str();
}
HisOrdDtlBlockPair& tBlkPair = _his_orddtl_map[key];
if (tBlkPair._block == NULL)
return NULL;
HisOrdDtlBlock* tBlock = tBlkPair._block;
uint32_t tcnt = (tBlkPair._buffer.size() - sizeof(HisOrdDtlBlock)) / sizeof(WTSOrdDtlStruct);
if (tcnt <= 0)
return NULL;
WTSOrdDtlStruct* pTick = std::lower_bound(tBlock->_items, tBlock->_items + (tcnt - 1), eTick, [](const WTSOrdDtlStruct& a, const WTSOrdDtlStruct& b) {
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pTick - tBlock->_items;
if (pTick->action_date > eTick.action_date || pTick->action_time >= eTick.action_time)
{
pTick--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSOrdDtlSlice* slice = WTSOrdDtlSlice::create(stdCode, tBlock->_items + sIdx, cnt);
return slice;
}
}
WTSTransSlice* WtDataReader::readTransSlice(const char* stdCode, uint32_t count, uint64_t etime /* = 0 */)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate, curTime, curSecs;
if (etime == 0)
{
curDate = _sink->get_date();
curTime = _sink->get_min_time();
curSecs = _sink->get_secs();
etime = (uint64_t)curDate * 1000000000 + curTime * 100000 + curSecs;
}
else
{
//20190807124533900
curDate = (uint32_t)(etime / 1000000000);
curTime = (uint32_t)(etime % 1000000000) / 100000;
curSecs = (uint32_t)(etime % 100000);
}
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
uint32_t curTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), 0, 0, false);
bool isToday = (endTDate == curTDate);
std::string curCode = cInfo._code;
if (cInfo.isHot() && commInfo->isFuture())
curCode = _hot_mgr->getRawCode(cInfo._exchg, cInfo._product, endTDate);
else if (cInfo.isSecond() && commInfo->isFuture())
curCode = _hot_mgr->getSecondRawCode(cInfo._exchg, cInfo._product, endTDate);
//比较时间的对象
WTSTransStruct eTick;
eTick.action_date = curDate;
eTick.action_time = curTime * 100000 + curSecs;
if (isToday)
{
TransBlockPair* tPair = getRTTransBlock(cInfo._exchg, curCode.c_str());
if (tPair == NULL)
return NULL;
RTTransBlock* rtBlock = tPair->_block;
WTSTransStruct* pItem = std::lower_bound(rtBlock->_trans, rtBlock->_trans + (rtBlock->_size - 1), eTick, [](const WTSTransStruct& a, const WTSTransStruct& b) {
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pItem - rtBlock->_trans;
//如果光标定位的tick时间比目标时间打, 则全部回退一个
if (pItem->action_date > eTick.action_date || pItem->action_time > eTick.action_time)
{
pItem--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSTransSlice* slice = WTSTransSlice::create(stdCode, rtBlock->_trans + sIdx, cnt);
return slice;
}
else
{
std::string key = StrUtil::printf("%s-%d", stdCode, endTDate);
auto it = _his_ordque_map.find(key);
if (it == _his_ordque_map.end())
{
std::stringstream ss;
ss << _base_dir << "his/trans/" << cInfo._exchg << "/" << endTDate << "/" << curCode << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
return NULL;
HisTransBlockPair& hisBlkPair = _his_trans_map[key];
StdFile::read_file_content(filename.c_str(), hisBlkPair._buffer);
if (hisBlkPair._buffer.size() < sizeof(HisTransBlockV2))
{
pipe_reader_log(_sink,LL_ERROR, "历史逐笔成交数据文件{}大小校验失败", filename.c_str());
hisBlkPair._buffer.clear();
return NULL;
}
HisTransBlockV2* tBlockV2 = (HisTransBlockV2*)hisBlkPair._buffer.c_str();
if (hisBlkPair._buffer.size() != (sizeof(HisTransBlockV2) + tBlockV2->_size))
{
pipe_reader_log(_sink,LL_ERROR, "历史逐笔成交数据文件{}大小校验失败", filename.c_str());
return NULL;
}
//需要解压
std::string buf = WTSCmpHelper::uncompress_data(tBlockV2->_data, (uint32_t)tBlockV2->_size);
//将原来的buffer只保留一个头部,并将所有tick数据追加到尾部
hisBlkPair._buffer.resize(sizeof(HisTransBlock));
hisBlkPair._buffer.append(buf);
tBlockV2->_version = BLOCK_VERSION_RAW_V2;
hisBlkPair._block = (HisTransBlock*)hisBlkPair._buffer.c_str();
}
HisTransBlockPair& tBlkPair = _his_trans_map[key];
if (tBlkPair._block == NULL)
return NULL;
HisTransBlock* tBlock = tBlkPair._block;
uint32_t tcnt = (tBlkPair._buffer.size() - sizeof(HisTransBlock)) / sizeof(WTSTransStruct);
if (tcnt <= 0)
return NULL;
WTSTransStruct* pTick = std::lower_bound(tBlock->_items, tBlock->_items + (tcnt - 1), eTick, [](const WTSTransStruct& a, const WTSTransStruct& b) {
if (a.action_date != b.action_date)
return a.action_date < b.action_date;
else
return a.action_time < b.action_time;
});
uint32_t eIdx = pTick - tBlock->_items;
if (pTick->action_date > eTick.action_date || pTick->action_time >= eTick.action_time)
{
pTick--;
eIdx--;
}
uint32_t cnt = min(eIdx + 1, count);
uint32_t sIdx = eIdx + 1 - cnt;
WTSTransSlice* slice = WTSTransSlice::create(stdCode, tBlock->_items + sIdx, cnt);
return slice;
}
}
bool WtDataReader::cacheFinalBarsFromLoader(const std::string& key, const char* stdCode, WTSKlinePeriod period)
{
if (NULL == _loader)
return false;
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
BarsList& barList = _bars_cache[key];
barList._code = stdCode;
barList._period = period;
barList._exchg = cInfo._exchg;
std::string pname;
switch (period)
{
case KP_Minute1: pname = "m1"; break;
case KP_Minute5: pname = "m5"; break;
case KP_DAY: pname = "d"; break;
default: pname = ""; break;
}
pipe_reader_log(_sink,LL_INFO, "Reading final bars of {} via extended loader...", stdCode);
bool ret = _loader->loadFinalHisBars(&barList, stdCode, period, [](void* obj, WTSBarStruct* firstBar, uint32_t count) {
BarsList* bars = (BarsList*)obj;
bars->_factor = 1.0;
bars->_bars.resize(count);
memcpy(bars->_bars.data(), firstBar, sizeof(WTSBarStruct)*count);
});
if(ret)
pipe_reader_log(_sink,LL_INFO, "{} items of back {} data of {} loaded via extended loader", barList._bars.size(), pname.c_str(), stdCode);
return ret;
}
bool WtDataReader::cacheIntegratedFutBars(const std::string& key, const char* stdCode, WTSKlinePeriod period)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate = TimeUtils::getCurDate();
uint32_t curTime = TimeUtils::getCurMin() / 100;
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
std::string pname;
switch (period)
{
case KP_Minute1: pname = "min1"; break;
case KP_Minute5: pname = "min5"; break;
default: pname = "day"; break;
}
BarsList& barList = _bars_cache[key];
barList._code = stdCode;
barList._period = period;
barList._exchg = cInfo._exchg;
std::vector<std::vector<WTSBarStruct>*> barsSections;
uint32_t realCnt = 0;
const char* hot_flag = cInfo.isHot() ? FILE_SUF_HOT : FILE_SUF_2ND;
//先按照HOT代码进行读取, 如rb.HOT
std::vector<WTSBarStruct>* hotAy = NULL;
uint64_t lastHotTime = 0;
do
{
/*
* By Wesley @ 2021.12.20
* 本来这里是要先调用_loader->loadRawHisBars从外部加载器读取主力合约数据的
* 但是上层会调用一次loadFinalHisBars,这里再调用loadRawHisBars就冗余了,所以直接跳过
*/
std::stringstream ss;
ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << cInfo._exchg << "." << cInfo._product << hot_flag << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
break;
std::string content;
StdFile::read_file_content(filename.c_str(), content);
if (content.size() < sizeof(HisKlineBlock))
{
pipe_reader_log(_sink,LL_ERROR, "历史K线数据文件{}大小校验失败", filename);
break;
}
proc_block_data(content, true, false);
if(content.empty())
break;
uint32_t barcnt = content.size() / sizeof(WTSBarStruct);
hotAy = new std::vector<WTSBarStruct>();
hotAy->resize(barcnt);
memcpy(hotAy->data(), content.data(), content.size());
if (period != KP_DAY)
lastHotTime = hotAy->at(barcnt - 1).time;
else
lastHotTime = hotAy->at(barcnt - 1).date;
pipe_reader_log(_sink,LL_INFO, "{} items of back {} data of wrapped contract {} directly loaded", barcnt, pname.c_str(), stdCode);
} while (false);
HotSections secs;
if (cInfo.isHot())
{
if (!_hot_mgr->splitHotSecions(cInfo._exchg, cInfo._product, 19900102, endTDate, secs))
return false;
}
else if (cInfo.isSecond())
{
if (!_hot_mgr->splitSecondSecions(cInfo._exchg, cInfo._product, 19900102, endTDate, secs))
return false;
}
if (secs.empty())
return false;
bool bAllCovered = false;
for (auto it = secs.rbegin(); it != secs.rend(); it++)
{
const HotSection& hotSec = *it;
const char* curCode = hotSec._code.c_str();
uint32_t rightDt = hotSec._e_date;
uint32_t leftDt = hotSec._s_date;
//要先将日期转换为边界时间
WTSBarStruct sBar, eBar;
if (period != KP_DAY)
{
uint64_t sTime = _base_data_mgr->getBoundaryTime(stdPID.c_str(), leftDt, false, true);
uint64_t eTime = _base_data_mgr->getBoundaryTime(stdPID.c_str(), rightDt, false, false);
sBar.date = leftDt;
sBar.time = ((uint32_t)(sTime / 10000) - 19900000) * 10000 + (uint32_t)(sTime % 10000);
if (sBar.time < lastHotTime) //如果边界时间小于主力的最后一根Bar的时间, 说明已经有交叉了, 则不需要再处理了
{
bAllCovered = true;
sBar.time = lastHotTime + 1;
}
eBar.date = rightDt;
eBar.time = ((uint32_t)(eTime / 10000) - 19900000) * 10000 + (uint32_t)(eTime % 10000);
if (eBar.time <= lastHotTime) //右边界时间小于最后一条Hot时间, 说明全部交叉了, 没有再找的必要了
break;
}
else
{
sBar.date = leftDt;
if (sBar.date < lastHotTime) //如果边界时间小于主力的最后一根Bar的时间, 说明已经有交叉了, 则不需要再处理了
{
bAllCovered = true;
sBar.date = (uint32_t)lastHotTime + 1;
}
eBar.date = rightDt;
if (eBar.date <= lastHotTime)
break;
}
/*
* By Wesley @ 2021.12.20
* 先从extloader读取分月合约的K线数据
* 如果没有读到,再从文件读取
*/
bool bLoaded = false;
std::string buffer;
if (NULL != _loader)
{
std::string wCode = StrUtil::printf("%s.%s.%s", cInfo._exchg, cInfo._product, (char*)curCode + strlen(cInfo._product));
bLoaded = _loader->loadRawHisBars(&buffer, wCode.c_str(), period, [](void* obj, WTSBarStruct* bars, uint32_t count) {
std::string* buff = (std::string*)obj;
buff->resize(sizeof(WTSBarStruct)*count);
memcpy((void*)buff->c_str(), bars, sizeof(WTSBarStruct)*count);
});
}
if (!bLoaded)
{
std::stringstream ss;
ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << curCode << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
continue;
std::string content;
StdFile::read_file_content(filename.c_str(), content);
if (content.size() < sizeof(HisKlineBlock))
{
pipe_reader_log(_sink, LL_ERROR, "Sizechecking of his dta file {} failed", filename.c_str());
return false;
}
proc_block_data(content, true, false);
buffer.swap(content);
}
if(buffer.empty())
break;
uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
WTSBarStruct* firstBar = (WTSBarStruct*)buffer.data();
WTSBarStruct* pBar = std::lower_bound(firstBar, firstBar + (barcnt - 1), sBar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
if (period == KP_DAY)
{
return a.date < b.date;
}
else
{
return a.time < b.time;
}
});
uint32_t sIdx = pBar - firstBar;
if ((period == KP_DAY && pBar->date < sBar.date) || (period != KP_DAY && pBar->time < sBar.time)) //早于边界时间
{
//早于边界时间, 说明没有数据了, 因为lower_bound会返回大于等于目标位置的数据
continue;
}
pBar = std::lower_bound(firstBar + sIdx, firstBar + (barcnt - 1), eBar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
if (period == KP_DAY)
{
return a.date < b.date;
}
else
{
return a.time < b.time;
}
});
uint32_t eIdx = pBar - firstBar;
if ((period == KP_DAY && pBar->date > eBar.date) || (period != KP_DAY && pBar->time > eBar.time))
{
pBar--;
eIdx--;
}
if (eIdx < sIdx)
continue;
uint32_t curCnt = eIdx - sIdx + 1;
std::vector<WTSBarStruct>* tempAy = new std::vector<WTSBarStruct>();
tempAy->resize(curCnt);
memcpy(tempAy->data(), &firstBar[sIdx], sizeof(WTSBarStruct)*curCnt);
realCnt += curCnt;
barsSections.emplace_back(tempAy);
if (bAllCovered)
break;
}
if (hotAy)
{
barsSections.emplace_back(hotAy);
realCnt += hotAy->size();
}
if (realCnt > 0)
{
barList._bars.resize(realCnt);
uint32_t curIdx = 0;
for (auto it = barsSections.rbegin(); it != barsSections.rend(); it++)
{
std::vector<WTSBarStruct>* tempAy = *it;
memcpy(barList._bars.data() + curIdx, tempAy->data(), tempAy->size() * sizeof(WTSBarStruct));
curIdx += tempAy->size();
delete tempAy;
}
barsSections.clear();
}
pipe_reader_log(_sink,LL_INFO, "{} items of back {} data of {} cached", realCnt, pname.c_str(), stdCode);
return true;
}
bool WtDataReader::cacheAdjustedStkBars(const std::string& key, const char* stdCode, WTSKlinePeriod period)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate = TimeUtils::getCurDate();
uint32_t curTime = TimeUtils::getCurMin() / 100;
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
std::string pname;
switch (period)
{
case KP_Minute1: pname = "min1"; break;
case KP_Minute5: pname = "min5"; break;
default: pname = "day"; break;
}
BarsList& barList = _bars_cache[key];
barList._code = stdCode;
barList._period = period;
barList._exchg = cInfo._exchg;
std::vector<std::vector<WTSBarStruct>*> barsSections;
uint32_t realCnt = 0;
std::vector<WTSBarStruct>* ayAdjusted = NULL;
uint64_t lastQTime = 0;
do
{
/*
* By Wesley @ 2021.12.20
* 本来这里是要先调用_loader->loadRawHisBars从外部加载器读取复权数据的
* 但是上层会调用一次loadFinalHisBars,这里再调用loadRawHisBars就冗余了,所以直接跳过
*/
char flag = cInfo._exright == 1 ? SUFFIX_QFQ : SUFFIX_HFQ;
std::stringstream ss;
ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << cInfo._code << flag << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
break;
std::string content;
StdFile::read_file_content(filename.c_str(), content);
if (content.size() < sizeof(HisKlineBlock))
{
pipe_reader_log(_sink,LL_ERROR, "历史K线数据文件{}大小校验失败", filename.c_str());
break;
}
proc_block_data(content, true, false);
uint32_t barcnt = content.size() / sizeof(WTSBarStruct);
ayAdjusted = new std::vector<WTSBarStruct>();
ayAdjusted->resize(barcnt);
memcpy(ayAdjusted->data(), content.data(), content.size());
if (period != KP_DAY)
lastQTime = ayAdjusted->at(barcnt - 1).time;
else
lastQTime = ayAdjusted->at(barcnt - 1).date;
pipe_reader_log(_sink,LL_INFO, "{} items of adjusted back {} data of stock {} directly loaded", barcnt, pname.c_str(), stdCode);
} while (false);
bool bAllCovered = false;
do
{
const char* curCode = cInfo._code;
//要先将日期转换为边界时间
WTSBarStruct sBar;
if (period != KP_DAY)
{
sBar.date = TimeUtils::minBarToDate(lastQTime);
sBar.time = lastQTime + 1;
}
else
{
sBar.date = (uint32_t)lastQTime + 1;
}
/*
* By Wesley @ 2021.12.20
* 先从extloader读取
* 如果没有读到,再从文件读取
*/
bool bLoaded = false;
std::string buffer;
std::string rawCode = StrUtil::printf("%s.%s.%s", cInfo._exchg, cInfo._product, curCode);
if (NULL != _loader)
{
bLoaded = _loader->loadRawHisBars(&buffer, rawCode.c_str(), period, [](void* obj, WTSBarStruct* bars, uint32_t count) {
std::string* buff = (std::string*)obj;
buff->resize(sizeof(WTSBarStruct)*count);
memcpy((void*)buff->c_str(), bars, sizeof(WTSBarStruct)*count);
});
}
bool bOldVer = false;
if (!bLoaded)
{
std::stringstream ss;
ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << curCode << ".dsb";
std::string filename = ss.str();
if (!StdFile::exists(filename.c_str()))
continue;
std::string content;
StdFile::read_file_content(filename.c_str(), content);
if (content.size() < sizeof(HisKlineBlock))
{
pipe_reader_log(_sink,LL_ERROR, "历史K线数据文件{}大小校验失败", filename.c_str());
return false;
}
proc_block_data(content, true, false);
buffer.swap(content);
}
if(buffer.empty())
break;
uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
WTSBarStruct* firstBar = (WTSBarStruct*)buffer.data();
WTSBarStruct* pBar = std::lower_bound(firstBar, firstBar + (barcnt - 1), sBar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
if (period == KP_DAY)
{
return a.date < b.date;
}
else
{
return a.time < b.time;
}
});
if (pBar != NULL)
{
uint32_t sIdx = pBar - firstBar;
uint32_t curCnt = barcnt - sIdx;
std::vector<WTSBarStruct>* ayRaw = new std::vector<WTSBarStruct>();
ayRaw->resize(curCnt);
memcpy(ayRaw->data(), &firstBar[sIdx], sizeof(WTSBarStruct)*curCnt);
realCnt += curCnt;
auto& ayFactors = getAdjFactors(cInfo._code, cInfo._exchg, cInfo._product);
if (!ayFactors.empty())
{
//做复权处理
int32_t lastIdx = curCnt;
WTSBarStruct bar;
firstBar = ayRaw->data();
//根据复权类型确定基础因子
//如果是前复权,则历史数据会变小,以最后一个复权因子为基础因子
//如果是后复权,则新数据会变大,基础因子为1
double baseFactor = 1.0;
if (cInfo._exright == 1)
baseFactor = ayFactors.back()._factor;
else if (cInfo._exright == 2)
barList._factor = ayFactors.back()._factor;
for (auto it = ayFactors.rbegin(); it != ayFactors.rend(); it++)
{
const AdjFactor& adjFact = *it;
bar.date = adjFact._date;
//调整因子
double factor = adjFact._factor / baseFactor;
WTSBarStruct* pBar = NULL;
pBar = std::lower_bound(firstBar, firstBar + lastIdx - 1, bar, [period](const WTSBarStruct& a, const WTSBarStruct& b) {
return a.date < b.date;
});
if (pBar->date < bar.date)
continue;
WTSBarStruct* endBar = pBar;
if (pBar != NULL)
{
int32_t curIdx = pBar - firstBar;
while (pBar && curIdx < lastIdx)
{
pBar->open *= factor;
pBar->high *= factor;
pBar->low *= factor;
pBar->close *= factor;
pBar++;
curIdx++;
}
lastIdx = endBar - firstBar;
}
if (lastIdx == 0)
break;
}
}
barsSections.emplace_back(ayRaw);
}
} while (false);
if (ayAdjusted)
{
barsSections.emplace_back(ayAdjusted);
realCnt += ayAdjusted->size();
}
if (realCnt > 0)
{
barList._bars.resize(realCnt);
uint32_t curIdx = 0;
for (auto it = barsSections.rbegin(); it != barsSections.rend(); it++)
{
std::vector<WTSBarStruct>* tempAy = *it;
memcpy(barList._bars.data() + curIdx, tempAy->data(), tempAy->size() * sizeof(WTSBarStruct));
curIdx += tempAy->size();
delete tempAy;
}
barsSections.clear();
}
pipe_reader_log(_sink,LL_INFO, "{} items of back {} data of {} cached", realCnt, pname.c_str(), stdCode);
return true;
}
bool WtDataReader::cacheHisBarsFromFile(const std::string& key, const char* stdCode, WTSKlinePeriod period)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
uint32_t curDate = TimeUtils::getCurDate();
uint32_t curTime = TimeUtils::getCurMin() / 100;
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
std::string pname;
switch (period)
{
case KP_Minute1: pname = "min1"; break;
case KP_Minute5: pname = "min5"; break;
default: pname = "day"; break;
}
BarsList& barList = _bars_cache[key];
barList._code = stdCode;
barList._period = period;
barList._exchg = cInfo._exchg;
std::vector<std::vector<WTSBarStruct>*> barsSections;
uint32_t realCnt = 0;
if (!cInfo.isFlat() && commInfo->isFuture())
{
//如果是读取期货主力连续数据
return cacheIntegratedFutBars(key, stdCode, period);
}
else if(cInfo.isExright() && commInfo->isStock())
{
//如果是读取股票复权数据
return cacheAdjustedStkBars(key, stdCode, period);
}
//直接原始数据直接加载
/*
* By Wesley @ 2021.12.20
* 先从extloader读取
* 如果没有读到,再从文件读取
*/
bool bLoaded = false;
std::string buffer;
if (NULL != _loader)
{
bLoaded = _loader->loadRawHisBars(&buffer, stdCode, period, [](void* obj, WTSBarStruct* bars, uint32_t count) {
std::string* buff = (std::string*)obj;
buff->resize(sizeof(WTSBarStruct)*count);
memcpy((void*)buff->c_str(), bars, sizeof(WTSBarStruct)*count);
});
}
if (!bLoaded)
{
//读取历史的
std::stringstream ss;
ss << _base_dir << "his/" << pname << "/" << cInfo._exchg << "/" << cInfo._code << ".dsb";
std::string filename = ss.str();
if (StdFile::exists(filename.c_str()))
{
//如果有格式化的历史数据文件, 则直接读取
std::string content;
StdFile::read_file_content(filename.c_str(), content);
if (content.size() < sizeof(HisKlineBlock))
{
pipe_reader_log(_sink,LL_ERROR, "历史K线数据文件{}大小校验失败", filename.c_str());
return false;
}
proc_block_data(content, true, false);
buffer.swap(content);
}
}
if (buffer.empty())
return false;
uint32_t barcnt = buffer.size() / sizeof(WTSBarStruct);
WTSBarStruct* firstBar = (WTSBarStruct*)buffer.data();
if (barcnt > 0)
{
uint32_t sIdx = 0;
uint32_t idx = barcnt - 1;
uint32_t curCnt = (idx - sIdx + 1);
std::vector<WTSBarStruct>* tempAy = new std::vector<WTSBarStruct>();
tempAy->resize(curCnt);
memcpy(tempAy->data(), &firstBar[sIdx], sizeof(WTSBarStruct)*curCnt);
realCnt += curCnt;
barsSections.emplace_back(tempAy);
}
if (realCnt > 0)
{
barList._bars.resize(realCnt);
uint32_t curIdx = 0;
for (auto it = barsSections.rbegin(); it != barsSections.rend(); it++)
{
std::vector<WTSBarStruct>* tempAy = *it;
memcpy(barList._bars.data() + curIdx, tempAy->data(), tempAy->size()*sizeof(WTSBarStruct));
curIdx += tempAy->size();
delete tempAy;
}
barsSections.clear();
}
pipe_reader_log(_sink,LL_INFO, "{} items of back {} data of {} cached", realCnt, pname.c_str(), stdCode);
return true;
}
WTSKlineSlice* WtDataReader::readKlineSlice(const char* stdCode, WTSKlinePeriod period, uint32_t count, uint64_t etime /* = 0 */)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
std::string stdPID = StrUtil::printf("%s.%s", cInfo._exchg, cInfo._product);
std::string key = StrUtil::printf("%s#%u", stdCode, period);
auto it = _bars_cache.find(key);
bool bHasHisData = false;
if (it == _bars_cache.end())
{
/*
* By Wesley @ 2021.12.20
* 先从extloader加载最终的K线数据(如果是复权)
* 如果加载失败,则再从文件加载K线数据
*/
bHasHisData = cacheFinalBarsFromLoader(key, stdCode, period);
if(!bHasHisData)
bHasHisData = cacheHisBarsFromFile(key, stdCode, period);
}
else
{
bHasHisData = true;
}
uint32_t curDate, curTime;
if (etime == 0)
{
curDate = _sink->get_date();
curTime = _sink->get_min_time();
etime = (uint64_t)curDate * 10000 + curTime;
}
else
{
curDate = (uint32_t)(etime / 10000);
curTime = (uint32_t)(etime % 10000);
}
uint32_t endTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), curDate, curTime, false);
uint32_t curTDate = _base_data_mgr->calcTradingDate(stdPID.c_str(), 0, 0, false);
WTSBarStruct* hisHead = NULL;
WTSBarStruct* rtHead = NULL;
uint32_t hisCnt = 0;
uint32_t rtCnt = 0;
std::string pname;
switch (period)
{
case KP_Minute1: pname = "min1"; break;
case KP_Minute5: pname = "min5"; break;
default: pname = "day"; break;
}
uint32_t left = count;
//是否包含当天的
bool bHasToday = (endTDate == curTDate);
if (cInfo.isHot() && commInfo->isFuture())
{
_bars_cache[key]._raw_code = _hot_mgr->getRawCode(cInfo._exchg, cInfo._product, curTDate);
pipe_reader_log(_sink,LL_INFO, "Hot contract on {} confirmed: {} -> {}", curTDate, stdCode, _bars_cache[key]._raw_code.c_str());
}
else if (cInfo.isSecond() && commInfo->isFuture())
{
_bars_cache[key]._raw_code = _hot_mgr->getSecondRawCode(cInfo._exchg, cInfo._product, curTDate);
pipe_reader_log(_sink,LL_INFO, "Second contract on {} confirmed: {} -> {}", curTDate, stdCode, _bars_cache[key]._raw_code.c_str());
}
else
{
_bars_cache[key]._raw_code = cInfo._code;
}
if (bHasToday)
{
WTSBarStruct bar;
bar.date = curDate;
bar.time = (curDate - 19900000) * 10000 + curTime;
const char* curCode = _bars_cache[key]._raw_code.c_str();
//读取实时的
RTKlineBlockPair* kPair = getRTKilneBlock(cInfo._exchg, curCode, period);
if (kPair != NULL)
{
//读取当日的数据
WTSBarStruct* pBar = NULL;
pBar = std::lower_bound(kPair->_block->_bars, kPair->_block->_bars + (kPair->_block->_size - 1), bar, [period](const WTSBarStruct& a, const WTSBarStruct& b){
if (period == KP_DAY)
return a.date < b.date;
else
return a.time < b.time;
});
uint32_t idx = 0;
if (pBar != NULL)
idx = pBar - kPair->_block->_bars;
else
idx = kPair->_block->_size;
if ((period == KP_DAY && pBar->date > bar.date) || (period != KP_DAY && pBar->time > bar.time))
{
pBar--;
idx--;
}
uint32_t sIdx = 0;
if (left <= idx + 1)
{
sIdx = idx - left + 1;
}
uint32_t curCnt = (idx - sIdx + 1);
left -= (idx - sIdx + 1);
if(cInfo._exright == 2 && commInfo->isStock())
{
//后复权数据要把最新的数据进行复权处理,所以要作为历史数据追加到尾部
//虽然后复权数据要进行复权处理,但是实时数据的位置标记也要更新到最新,不然OnMinuteEnd会从开盘开始回放的
_bars_cache[key]._rt_cursor = idx;
BarsList& barsList = _bars_cache[key];
double factor = barsList._factor;
uint32_t oldSize = barsList._bars.size();
uint32_t newSize = oldSize + curCnt;
barsList._bars.resize(newSize);
memcpy(&barsList._bars[oldSize], &kPair->_block->_bars[sIdx], sizeof(WTSBarStruct)*curCnt);
for(uint32_t thisIdx = oldSize; thisIdx < newSize; thisIdx++)
{
WTSBarStruct* pBar = &barsList._bars[thisIdx];
pBar->open *= factor;
pBar->high *= factor;
pBar->low *= factor;
pBar->close *= factor;
}
}
else
{
_bars_cache[key]._rt_cursor = idx;
rtHead = &kPair->_block->_bars[sIdx];
rtCnt = curCnt;
}
}
}
if (left > 0 && bHasHisData)
{
hisCnt = left;
//历史数据,直接从缓存的历史数据尾部截取
BarsList& barList = _bars_cache[key];
hisCnt = min(hisCnt, (uint32_t)barList._bars.size());
hisHead = &barList._bars[barList._bars.size() - hisCnt];//indexBarFromCache(key, etime, hisCnt, period == KP_DAY);
}
pipe_reader_log(_sink, LL_DEBUG, "His {} bars of {} loaded, {} from history, {} from realtime", PERIOD_NAME[period], stdCode, hisCnt, rtCnt);
if (hisCnt + rtCnt > 0)
{
WTSKlineSlice* slice = WTSKlineSlice::create(stdCode, period, 1, hisHead, hisCnt);
if (rtCnt > 0)
slice->appendBlock(rtHead, rtCnt);
return slice;
}
return NULL;
}
// 获取tick数据块
WtDataReader::TickBlockPair* WtDataReader::getRTTickBlock(const char* exchg, const char* code)
{
std::string key = StrUtil::printf("%s.%s", exchg, code);
std::string path = StrUtil::printf("%srt/ticks/%s/%s.dmb", _base_dir.c_str(), exchg, code);
if (!StdFile::exists(path.c_str()))
return NULL;
TickBlockPair& block = _rt_tick_map[key];
if (block._file == NULL || block._block == NULL)
{
if (block._file == NULL)
{
block._file.reset(new BoostMappingFile());
}
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTTickBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
else if (block._last_cap != block._block->_capacity)
{
//说明文件大小已变, 需要重新映射
block._file.reset(new BoostMappingFile());
block._last_cap = 0;
block._block = NULL;
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTTickBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
return █
}
WtDataReader::OrdDtlBlockPair* WtDataReader::getRTOrdDtlBlock(const char* exchg, const char* code)
{
std::string key = StrUtil::printf("%s.%s", exchg, code);
std::string path = StrUtil::printf("%srt/orders/%s/%s.dmb", _base_dir.c_str(), exchg, code);
if (!StdFile::exists(path.c_str()))
return NULL;
OrdDtlBlockPair& block = _rt_orddtl_map[key];
if (block._file == NULL || block._block == NULL)
{
if (block._file == NULL)
{
block._file.reset(new BoostMappingFile());
}
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTOrdDtlBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
else if (block._last_cap != block._block->_capacity)
{
//说明文件大小已变, 需要重新映射
block._file.reset(new BoostMappingFile());
block._last_cap = 0;
block._block = NULL;
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTOrdDtlBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
return █
}
WtDataReader::OrdQueBlockPair* WtDataReader::getRTOrdQueBlock(const char* exchg, const char* code)
{
std::string key = StrUtil::printf("%s.%s", exchg, code);
std::string path = StrUtil::printf("%srt/queue/%s/%s.dmb", _base_dir.c_str(), exchg, code);
if (!StdFile::exists(path.c_str()))
return NULL;
OrdQueBlockPair& block = _rt_ordque_map[key];
if (block._file == NULL || block._block == NULL)
{
if (block._file == NULL)
{
block._file.reset(new BoostMappingFile());
}
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTOrdQueBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
else if (block._last_cap != block._block->_capacity)
{
//说明文件大小已变, 需要重新映射
block._file.reset(new BoostMappingFile());
block._last_cap = 0;
block._block = NULL;
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTOrdQueBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
return █
}
WtDataReader::TransBlockPair* WtDataReader::getRTTransBlock(const char* exchg, const char* code)
{
std::string key = StrUtil::printf("%s.%s", exchg, code);
std::string path = StrUtil::printf("%srt/trans/%s/%s.dmb", _base_dir.c_str(), exchg, code);
if (!StdFile::exists(path.c_str()))
return NULL;
TransBlockPair& block = _rt_trans_map[key];
if (block._file == NULL || block._block == NULL)
{
if (block._file == NULL)
{
block._file.reset(new BoostMappingFile());
}
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTTransBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
else if (block._last_cap != block._block->_capacity)
{
//说明文件大小已变, 需要重新映射
block._file.reset(new BoostMappingFile());
block._last_cap = 0;
block._block = NULL;
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTTransBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
return █
}
WtDataReader::RTKlineBlockPair* WtDataReader::getRTKilneBlock(const char* exchg, const char* code, WTSKlinePeriod period)
{
// 周期必须是1min和5min
if (period != KP_Minute1 && period != KP_Minute5)
return NULL;
std::string key = StrUtil::printf("%s.%s", exchg, code);
// K线数据块映射
RTKBlockFilesMap* cache_map = NULL;
std::string subdir = "";
BlockType bType;
switch (period)
{
case KP_Minute1:
cache_map = &_rt_min1_map;
subdir = "min1";
bType = BT_RT_Minute1;
break;
case KP_Minute5:
cache_map = &_rt_min5_map;
subdir = "min5";
bType = BT_RT_Minute5;
break;
default: break;
}
// 数据块文件名
std::string path = StrUtil::printf("%srt/%s/%s/%s.dmb", _base_dir.c_str(), subdir.c_str(), exchg, code);
if (!StdFile::exists(path.c_str()))
return NULL;
RTKlineBlockPair& block = (*cache_map)[key];
if (block._file == NULL || block._block == NULL)
{
if (block._file == NULL)
{
block._file.reset(new BoostMappingFile());
}
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTKlineBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
else if (block._last_cap != block._block->_capacity)
{
//说明文件大小已变, 需要重新映射
pipe_reader_log(_sink, LL_DEBUG, "RT {} block of {}.{} expanded to {}, remapping...", subdir.c_str(), exchg, code, block._block->_capacity);
block._file.reset(new BoostMappingFile());
block._last_cap = 0;
block._block = NULL;
if (!block._file->map(path.c_str(), boost::interprocess::read_only, boost::interprocess::read_only))
return NULL;
block._block = (RTKlineBlock*)block._file->addr();
block._last_cap = block._block->_capacity;
}
pipe_reader_log(_sink, LL_DEBUG, "RT {} block of {}.{} loaded", subdir.c_str(), exchg, code);
return █
}
// K线闭合
void WtDataReader::onMinuteEnd(uint32_t uDate, uint32_t uTime, uint32_t endTDate /* = 0 */)
{
// 这里应该触发检查
uint64_t nowTime = (uint64_t)uDate * 10000 + uTime;
// 每分钟执行一次
if (nowTime <= _last_time)
return;
for (auto it = _bars_cache.begin(); it != _bars_cache.end(); it++)
{
BarsList& barsList = (BarsList&)it->second;
if (barsList._period != KP_DAY)
{
if (!barsList._raw_code.empty())
{
RTKlineBlockPair* kBlk = getRTKilneBlock(barsList._exchg.c_str(), barsList._raw_code.c_str(), barsList._period);
if (kBlk == NULL)
continue;
uint32_t preCnt = 0;
if (barsList._rt_cursor == UINT_MAX)
preCnt = 0;
else
preCnt = barsList._rt_cursor + 1;
for (;;)
{
if (kBlk->_block->_size <= preCnt)
break;
WTSBarStruct& nextBar = kBlk->_block->_bars[preCnt];
uint64_t barTime = 199000000000 + nextBar.time;
if (barTime <= nowTime)
{
//如果不是后复权,则直接回调onbar
//如果是后复权,则将最新bar复权处理以后,添加到cache中,再回调onbar
if(barsList._factor == DBL_MAX)
{
_sink->on_bar(barsList._code.c_str(), barsList._period, &nextBar);
}
else
{
WTSBarStruct cpBar = nextBar;
cpBar.open *= barsList._factor;
cpBar.high *= barsList._factor;
cpBar.low *= barsList._factor;
cpBar.close *= barsList._factor;
barsList._bars.emplace_back(cpBar);
_sink->on_bar(barsList._code.c_str(), barsList._period, &barsList._bars[barsList._bars.size()-1]);
}
}
else
{
break;
}
preCnt++;
}
if (preCnt > 0)
barsList._rt_cursor = preCnt - 1;
}
}
}
//更新全部数据读取模块
if (_sink)
_sink->on_all_bar_updated(uTime);
_last_time = nowTime;
}
double WtDataReader::getAdjFactorByDate(const char* stdCode, uint32_t date /* = 0 */)
{
CodeHelper::CodeInfo cInfo = CodeHelper::extractStdCode(stdCode);
WTSCommodityInfo* commInfo = _base_data_mgr->getCommodity(cInfo._exchg, cInfo._product);
if (!commInfo->isStock())
return 1.0;
AdjFactor factor = { date, 1.0 };
std::string key = stdCode;
if (cInfo.isExright())
key = key.substr(0, key.size() - 1);
const AdjFactorList& factList = _adj_factors[key];
if (factList.empty())
return 1.0;
auto it = std::lower_bound(factList.begin(), factList.end(), factor, [](const AdjFactor& a, const AdjFactor&b) {
return a._date < b._date;
});
if(it == factList.end())
{
//找不到,则说明目标日期大于最后一条的日期,直接返回最后一条除权因子
return factList.back()._factor;
}
else
{
//如果找到了,但是命中的日期大于目标日期,则用上一条
//如果等于目标日期,则用命中这一条
if ((*it)._date > date)
it--;
return (*it)._factor;
}
}
const WtDataReader::AdjFactorList& WtDataReader::getAdjFactors(const char* code, const char* exchg, const char* pid)
{
char key[20] = { 0 };
sprintf(key, "%s.%s.%s", exchg, pid, code);
auto it = _adj_factors.find(key);
if (it == _adj_factors.end())
{
//By Wesley @ 2021.12.21
//如果没有复权因子,就从extloader按需读一次
if (_loader)
{
if(_sink) pipe_reader_log(_sink,LL_INFO, "No adjusting factors of {} cached, searching via extented loader...", key);
_loader->loadAdjFactors(this, key, [](void* obj, const char* stdCode, uint32_t* dates, double* factors, uint32_t count) {
WtDataReader* self = (WtDataReader*)obj;
AdjFactorList& fctrLst = self->_adj_factors[stdCode];
for (uint32_t i = 0; i < count; i++)
{
AdjFactor adjFact;
adjFact._date = dates[i];
adjFact._factor = factors[i];
fctrLst.emplace_back(adjFact);
}
//一定要把第一条加进去,不然如果是前复权的话,可能会漏处理最早的数据
AdjFactor adjFact;
adjFact._date = 19900101;
adjFact._factor = 1;
fctrLst.emplace_back(adjFact);
std::sort(fctrLst.begin(), fctrLst.end(), [](const AdjFactor& left, const AdjFactor& right) {
return left._date < right._date;
});
pipe_reader_log(self->_sink, LL_INFO, "{} items of adjusting factors of {} loaded via extended loader", count, stdCode);
});
}
}
return _adj_factors[key];
}