对加密货币盘口与逐笔交易数据的回放展示,可帮助量化研究人员检验量化策略,也有助于交易员复盘,加深对市场的洞察。DolphinDB可实现盘口和逐笔交易数据的高速回放,以及对回放结果逐点查询。
DolphinDB支持将多个分布式表同步回放并发布到流数据表,例如对盘口和交易这两个表进行同步回放。前端JavaScript使用DolphinDB Web API来轮询回放输出的流数据表,实现盘口和交易数据的可视化回放。DolphinDB自带Web服务器,整个流程可在DolphinDB内完成,无外部依赖。
加密货币盘口与逐笔交易数据回放可通过以下4个步骤来实现。用户亦可使用docker快速体验回放功能,具体请参考文末介绍。
1. 部署DolphinDB节点
下载DolphinDB最新版本,并部署集群。部署教程请参考单服务器集群部署教程。
2. 下载盘口和逐笔交易数据
本文使用的是火币研究院提供的加密货币交易数据,可以通过火币数据API获取。获取数据的示例代码可以参考python示例代码或java示例代码。
3. 导入数据到DolphinDB
本文将获取的orderBook的tick级数据保存为csv文件,通过loadTextEx函数快速地将文件导入到数据库。用户也可以通过Python API或Java API将数据导入到DolphinDB中。以下代码在DolphinDB GUI中执行。
(1)数据预处理
如果保存的csv文件中第一行是无关信息,可以采用下面脚本进行数据预处理,处理好的文件保存到某个目录,本案例将两个文件分别保存到/hdd/data/orderBook-processed和/hdd/data/tick-processes目录。如果csv文件第一行没有无关信息,可忽略这一步骤。
-
//删除数据文件第一行无关信息
-
def dataPreProcess(DIR){
-
if(!exists(DIR+
"-processed/"))
-
mkdir(DIR+
"-processed/")
-
fileList =
exec filename from files(DIR) where isDir =
false, filename like
"%.csv"
-
for(filename
in fileList){
-
f = file(DIR +
"/" + filename)
-
y = f.readLines(
1000000).removeHead!(
1)
-
saveText(y, DIR+
"-processed/" + filename)
-
}
-
}
-
dataPreProcess(
"/hdd/data/orderBook")
-
dataPreProcess(
"/hdd/data/tick")
(2)创建DolphinDB数据库
根据数据量以及查询字段,数据库可按照交易标的代码和业务时间进行组合分区。本案例中,数据库的名称为dfs://huobiDB。如果需要修改,必须同时修改replay.html中数据库的名称。
-
def createDB(){
-
if(existsDatabase(
"dfs://huobiDB"))
-
dropDatabase(
"dfs://huobiDB")
-
//按照数据集的时间跨度,请自行调整VALUE分区日期范围
-
db1 = database(, VALUE,
2018.09
.01.
.2018
.09
.30)
-
db2 = database(, HASH, [SYMBOL,
20])
-
db = database(
"dfs://huobiDB", COMPO, [db1,db2])
-
}
-
-
def createTick(){
-
tick = table(
100:
0, `aggregate_ID`server_time`price`amount`buy_or_sell`first_trade_ID`last_trade_ID`product , [
INT,TIMESTAMP,
DOUBLE,
DOUBLE,CHAR,
INT,
INT,SYMBOL])
-
db = database(
"dfs://huobiDB")
-
return db.createPartitionedTable(tick, `tick, `server_time`product)
-
}
-
-
def createOrderBook(){
-
orderData = table(
100:
0, `lastUpdateId`server_time`buy_1_price`buy_2_price`buy_3_price`buy_4_price`buy_5_price`buy_6_price`buy_7_price`buy_8_price`buy_9_price`buy_10_price`buy_11_price`buy_12_price`buy_13_price`buy_14_price`buy_15_price`buy_16_price`buy_17_price`buy_18_price`buy_19_price`buy_20_price`sell_1_price`sell_2_price`sell_3_price`sell_4_price`sell_5_price`sell_6_price`sell_7_price`sell_8_price`sell_9_price`sell_10_price`sell_11_price`sell_12_price`sell_13_price`sell_14_price`sell_15_price`sell_16_price`sell_17_price`sell_18_price`sell_19_price`sell_20_price`buy_1_amount`buy_2_amount`buy_3_amount`buy_4_amount`buy_5_amount`buy_6_amount`buy_7_amount`buy_8_amount`buy_9_amount`buy_10_amount`buy_11_amount`buy_12_amount`buy_13_amount`buy_14_amount`buy_15_amount`buy_16_amount`buy_17_amount`buy_18_amount`buy_19_amount`buy_20_amount`sell_1_amount`sell_2_amount`sell_3_amount`sell_4_amount`sell_5_amount`sell_6_amount`sell_7_amount`sell_8_amount`sell_9_amount`sell_10_amount`sell_11_amount`sell_12_amount`sell_13_amount`sell_14_amount`sell_15_amount`sell_16_amount`sell_17_amount`sell_18_amount`sell_19_amount`sell_20_amount`product,[
INT,TIMESTAMP,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,SYMBOL])
-
db = database(
"dfs://huobiDB")
-
return db.createPartitionedTable(orderData, `orderBook, `server_time`product)
-
}
(3)将文本数据导入数据库
-
def loadTick(
path, filename, mutable tb){
-
tmp = filename.split(
"_")
-
product = tmp[
1]
-
file =
path +
"/" + filename
-
t = loadText(file)
-
t[`product]=product
-
tb.append!(t)
-
}
-
-
def loopLoadTick(mutable tb,
path){
-
fileList = exec filename from files(
path,
"%.csv")
-
for(filename
in fileList){
-
print filename
-
loadTick(
path, filename, tb)
-
}
-
}
-
-
-
def loadOrderBook(
path, filename, mutable tb){
-
tmp = filename.split(
"_")
-
product = tmp[
1]
-
file =
path +
"/" + filename
-
t = loadText(file)
-
t[`product] = product
-
tb.append!(t)
-
}
-
-
def loopLoadOrderBook(mutable tb,
path){
-
fileList = exec filename from files(
path,
"%.csv")
-
for(filename
in fileList){
-
print filename
-
loadOrderBook(
path, filename, tb)
-
}
-
}
-
-
login(
"admin",
"123456")
-
tb = createOrderBook()
-
loopLoadOrderBook(tb,
"/hdd/data/orderBook-processed")
-
tb = createTick()
-
loopLoadTick(tb,
"/hdd/data/tick-processed")
(4)定义数据回放函数
-
def replayData(productCode, startTime, length, rate){
-
login(
'admin',
'123456');
-
tick = loadTable(
'dfs:
//huobiDB', 'tick');
-
orderbook = loadTable(
'dfs:
//huobiDB', 'orderBook');
-
-
schTick = select name,typeString as
type from tick.schema().colDefs;
-
schOrderBook = select name,typeString as
type from orderbook.schema().colDefs;
-
-
share(streamTable(
100:
0, schOrderBook.name, schOrderBook.
type), `outOrder);
-
share(streamTable(
100:
0, schTick.name, schTick.
type), `outTick);
-
enableTablePersistence(objByName(`outOrder),
true,
true,
100000);
-
enableTablePersistence(objByName(`outTick),
true,
true,
100000);
-
clearTablePersistence(objByName(`outOrder));
-
clearTablePersistence(objByName(`outTick));
-
-
share(streamTable(
100:
0, schOrderBook.name, schOrderBook.
type), `outOrder);
-
share(streamTable(
100:
0, schTick.name, schTick.
type), `outTick);
-
enableTablePersistence(objByName(`outOrder),
true,
true,
100000);
-
enableTablePersistence(objByName(`outTick),
true,
true,
100000);
-
-
endTime = temporalAdd(startTime, length,
"m")
-
sqlTick = sql(sqlCol(
"*"), tick, [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]);
-
sqlOrder = sql(sqlCol(
"*"), orderbook, [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]);
-
cutCount = length *
60 /
20
-
trs = cutPoints(timestamp(startTime..endTime), cutCount);
-
rds = replayDS(sqlTick, `server_time , , trs);
-
rds2 = replayDS(sqlOrder, `server_time , , trs);
-
return submitJob(
'replay_huobi',
'replay_huobi', replay, [rds,rds2], [`outTick,`outOrder],`server_time ,, rate);
-
}
-
-
addFunctionView(replayData);
4. 数据回放
下载数据回放界面的html压缩包,下载地址:https://github.com/dolphindb/applications/raw/master/cryptocurr_replay/replay.zip。将replay.zip解压到DolphinDB程序包的web目录。
在浏览器地址栏中输入http://[host]:[port]/replay.html打开数据回放界面。这里的host和port是指数据节点的IP地址和端口号,如http://192.168.1.135:8902/replay.html。
数据回放前,我们可以设置以下参数:
- Product:加密货币代码
- Replay Rate:回放速度,即每秒钟回放的记录数。如果市场每秒钟产生100笔交易,Replay Rate设置为1000就以10倍的速度回放。
- Start Time:数据的起始时间
- Length:数据的时间跨度,单位是分钟。如果Start Time设置为2018.09.17 00:00:00,Length设置为60,表示回放的数据是在2018.09.17 00:00:00-2018.09.17 00:59:59之间产生的。
回放结束后,点击左上角正方形图标按钮(“结束”按钮)。单击价格趋势图中的点,表格中会显示该时间点之前的10笔数据。具体操作请查看图片https://raw.githubusercontent.com/dolphindb/Tutorials_CN/master/images/replay/v.gif。
使用docker快速体验回放功能
我们提供了一个包含DolphinDB Server以及演示数据的docker容器,并打包成tar文件提供下载。用户仅需要安装docker环境,下载打包文件,运行下面的命令就可以快速完成演示环境部署。
tar文件下载地址:https://www.dolphindb.cn/downloads/cryptocurr_replay.tar.gz
-
gunzip cryptocurr_replay.tar.gz
-
##docker若没有赋予非管理访问权限,可以使用 sudo docker
-
docker import cryptocurr_replay.tar ddb/replay:v
1
-
##生成并启动容器
-
docker run -dt -p
8888:
8848 --name replay
1 ddb/replay:v
1 /bin/bash /dolphindb/start.sh
启动容器后,docker内 DolphinDB database 的访问端口被映射到宿主机8888端口,打开浏览器访问http://[宿主机ip]:8888/replay.html
,进入到回放演示界面。
为了控制docker容器大小,方便下载,演示数据仅包含2018.09.17一天的加密货币编号为ETHUSDT,ETHBTC,BTCUSDT
的交易数据。
注意:因为内置的license文件会过期, 需要从官网下载最新的社区版license替换。下载社区版解压后进入server目录下,拷贝dolphindb.lic文件覆盖docker中的/dolphindb/目录下同名文件。
sudo docker cp ./dolphindb.lic replay1:/dolphindb/dolphindb.lic
重启docker:
sudo docker restart replay1
注意事项
- 本案例当前仅限单用户使用,不支持多用户同时回放。
- 为了简化操作,数据库名称和数据库用户信息均固化在网页中,若有需要请自行修改replay.html文件
转载:https://blog.csdn.net/qq_41996852/article/details/111191554