飞道的博客

干货丨时序数据库DolphinDB即时编译(JIT)详解

370人阅读  评论(0)

DolphinDB是高性能分布式时序数据库,内置了丰富的计算功能和强大多范式编程语言。为了能够提高DolphinDB脚本的执行效率,从1.01版本开始,DolphinDB支持即时编译(JIT)。

1 JIT简介

即时编译(英文: Just-in-time compilation, 缩写: JIT),又译及时编译或实时编译,是动态编译的一种形式,可提高程序运行效率。

通常程序有两种运行方式:编译执行和解释执行。编译执行在程序执行前全部翻译为机器码,特点是运行效率较高,以C/C++为代表。解释执行是由解释器对程序逐句解释并执行,灵活性较强,但是执行效率较低,以Python为代表。

即时编译融合了两者的优点,在运行时将代码翻译为机器码,可以达到与静态编译语言相近的执行效率。Python的第三方实现PyPy通过JIT明显改善了解释器的性能。绝大多数的Java实现都依赖JIT以提高代码的运行效率。

2 JIT在DolphinDB中的作用

DolphinDB的编程语言是解释执行,运行程序时首先对程序进行语法分析生成语法树,然后递归执行。在不能使用向量化的情况下,解释成本会比较高。这是由于DolphinDB底层由C++实现,脚本中的一次函数调用会转化为多次C++内的虚拟函数调用。for循环,while循环和if-else等语句中,由于要反复调用函数,十分耗时,在某些场景下不能满足实时性的需求。

DolphinDB中的即时编译功能显著提高了for循环,while循环和if-else等语句的运行速度,特别适合于无法使用向量化运算但又对运行速度有极高要求的场景,例如高频因子计算、实时流数据处理等。

在下面的例子中,我们对比使用和不使用JIT的情况下,do-while循环计算1到1000000之和100次所需要的时间。


  
  1. def sum_without_jit(v) {
  2. s = 0l
  3. i = 1
  4. n = size(v)
  5. do {
  6. s += v[i]
  7. i += 1
  8. } while(i <= n)
  9. return s
  10. }
  11. @jit
  12. def sum_with_jit(v) {
  13. s = 0l
  14. i = 1
  15. n = size(v)
  16. do {
  17. s += v[i]
  18. i += 1
  19. } while(i <= n)
  20. return s
  21. }
  22. vec = 1..1000000
  23. timer(100) sum_without_jit(vec) // 120552.740 ms
  24. timer(100) sum_with_jit(vec) // 290.065 ms
  25. timer(100) sum(vec) // 48.922 ms

不使用JIT的耗时是使用JIT的415倍,使用内置sum函数耗时1/7左右,这里内置函数比JIT快是因为JIT生成的代码中有很多检查NULL值的指令,内置的sum函数如果发现输入的array没有NULL值则会省略这一步操作。


  
  1. vec[ 100] = NULL
  2. timer( 100) sum(vec) // 118. 063 ms

如果加上NULL值,内置sum的速度是JIT的2.5倍左右,这是由于内置sum还进行了一些手动的展开优化。如果函数内涉及到更多的复杂计算,那么JIT的速度则会超过向量化运算,这个我们在下面会提到。

若任务可以使用向量化计算,视情况可以不使用JIT,但是在诸如如高频因子生成等实际应用中,如何把循环计算转化为向量化运算需要一定的技巧。

知乎上的一篇专栏中,我们展示了如何使用在DolphinDB中使用向量化运算,其中计算买卖信号的式子如下:

direction = (iif(signal>t1, 1h, iif(signal<t10, 0h, 00h)) - iif(signal<t2, 1h, iif(signal>t20, 0h, 00h))).ffill().nullFill(0h)

对于初学DolphinDB的人来说,需要了解iif函数才可写出以上语句。使用for循环改写以上语句则较为容易:


  
  1. @jit
  2. def calculate_with_jit(signal, n, t1, t10, t20, t2) {
  3. cur = 0
  4. idx = 0
  5. output = array(INT, n, n)
  6. for (s in signal) {
  7. if(s > t1) { // (t1, inf)
  8. cur = 1
  9. } else if(s >= t1 0) { // [t1 0, t1]
  10. if(cur == - 1) cur = 0
  11. } else if(s > t2 0) { // [t2 0, t1 0)
  12. cur = 0
  13. } else if(s >= t2) { // [t2, t2 0]
  14. if(cur == 1) cur = 0
  15. } else { // (-inf, t2)
  16. cur = - 1
  17. }
  18. output[idx] = cur
  19. idx += 1
  20. }
  21. return output
  22. }

把@jit去掉,得到不使用JIT的自定义函数calculate_without_jit。对比三种方法的耗时:


  
  1. n = 10000000
  2. t1= 60
  3. t10 = 50
  4. t20 = 30
  5. t2 = 20
  6. signal = rand( 100. 0, n)
  7. timer( 100) (iif(signal >t 1, 1h, iif(signal < t 10, 0h, 00h)) - iif(signal <t 2, 1h, iif(signal > t 20, 0h, 00h))).ffill().nullFill( 0h) // 41092. 019 ms
  8. timer( 100) calculate_with_jit(calculate, signal, size(signal), t 1, t 10, t 20, t 2) // 17075. 127 ms
  9. timer( 100) calculate_without_jit(signal, size(signal), t 1, t 10, t 20, t 2) // 1404406. 413 ms

本例中,使用JIT的速度向量化运算的2.4倍,是不用JIT的82倍。这里JIT的速度比向量化运算还要快,是因为向量化运算中调用了很多次DolphinDB的内置函数,产生了很多中间结果,

涉及到多次内存分配以及虚拟函数调用,而JIT生成的代码则没有这些额外的开销。

另外一种情况是,某些计算无法使用向量化,比如计算期权隐含波动率(implied volatility)时,需要使用牛顿法,无法使用向量化运算。这种情况下如果需要满足一定的实时性,可以选择使用DolphinDB的插件,亦可使用JIT。两者的区别在于,在任何场景下都可以使用插件,但是需要使用C++语言编写,比较复杂;JIT的编写相对而言较为容易,但是适用的场景较为有限。JIT的运行速度与使用C++插件的速度非常接近。

3 如何在DolphinDB中使用JIT

3.1 使用方法

DolphinDB目前仅支持对用户自定义函数进行JIT。只需在用户自定义函数之前的一行添加 @jit 的标识即可:


  
  1. @jit
  2. def myFunc( /* arguments */) {
  3. /* implementation */
  4. }

用户在调用此函数时,DolphinDB会将函数的代码实时编译为机器码后执行。

3.2 支持的语句

目前DolphinDB支持在JIT中使用以下几种语句:

  • 赋值语句,例如:

  
  1. @jit
  2. def func() {
  3. y = 1
  4. }

请注意,multiple assign目前是不支持的,例如:


  
  1. @jit
  2. def func() {
  3. a, b = 1, 2
  4. }
  5. func()

运行以上语句会抛出异常。

  • return语句,例如:

  
  1. @jit
  2. def func() {
  3. return 1
  4. }
  • if-else语句,比如:

  
  1. @jit
  2. def myAbs(x) {
  3. if(x > 0) return x
  4. else return -x
  5. }
  • do-while语句,例如:

  
  1. @jit
  2. def mySqrt(x) {
  3. diff = 0. 0000001
  4. guess = 1.0
  5. guess = (x / guess + guess) / 2.0
  6. do {
  7. guess = (x / guess + guess) / 2.0
  8. } while(abs(guess * guess - x) >= diff)
  9. return guess
  10. }
  • for语句,例如:

  
  1. @jit
  2. def mySum(vec) {
  3. s = 0
  4. for(i in vec) {
  5. s += i
  6. }
  7. return s
  8. }

DolphinDB支持在JIT中以上语句的任意嵌套。

3.3 支持的运算符和函数

目前DolphinDB支持在JIT中使用以下的运算符:add(+), sub(-), multiply(*), divide(/), and(&&), or(||), bitand(&), bitor(|), bitxor(^), eq(==), neq(!=), ge(>=), gt(>), le(<=), lt(<), neg(-), mod(%), seq(..), at([]),以上运算在所有数据类型下的实现都与非JIT的实现一致。

目前DolphinDB支持在JIT中使用以下的数学函数: explogsinasincosacostanatanabsceilfloorsqrt。以上数学函数在JIT中出现时,

如果接受的参数为scalar,那么在最后生成的机器码中会调用glibc中对应的函数或者经过优化的C实现的函数;如果接收的参数为array,那么最后会调用DolphinDB

提供的数学函数。这样的好处是通过直接调用C实现的代码提升函数运行效率,减少不必要的虚拟函数调用和内存分配。

目前DolphinDB支持在JIT中使用以下的内置函数:takearraysizeisValidrand,cdfNormal

需要注意,array函数的第一个参数必须直接指定具体的数据类型,不能通过变量传递指定。这是由于JIT编译时必须知道所有变量的类型,而array函数返回结果的类型由第一个参数指定,因此编译时必须该值必须已知。

3.4 空值的处理

JIT中所有的函数和运算符处理空值的方法都与原生函数和运算符一致,即每个数据类型都用该类型的最小值来表示该类型的空值,用户不需要专门处理空值。

3.5 JIT函数之间的调用

DolphinDB的JIT函数可以调用另一个JIT函数。例如:


  
  1. @jit
  2. def myfunc1(x) {
  3. return sqrt(x) + exp(x)
  4. }
  5. @jit
  6. def myfunc2(x) {
  7. return myfunc1(x)
  8. }
  9. myfunc2( 1.5)

在上面的例子中,内部会先编译myfunc1, 生成一个签名为 double myfunc1(double) 的native函数,myfunc2生成的机器码中直接调用这个函数,而不是在运行时判断myfunc1是否为JIT函数后再执行,从而达到最高的执行效率。

请注意,JIT函数内不可以调用非JIT的用户自定义函数,因为这样无法进行类型推导。关于类型推导下面会提到。

3.6 JIT的编译成本以及缓存机制

DolphinDB的JIT底层依赖LLVM实现,每个用户自定义函数在编译时都会生成自己的module,相互独立。编译主要包含以下几个步骤:

  1. LLVM相关变量和环境的初始化
  2. 根据DolphinDB脚本的语法树生成LLVM的IR
  3. 调用LLVM优化第二步生成的IR,然后编译为机器码

以上步骤中第一步耗时一般在5ms以内,后面两步的耗时与实际脚本的复杂度成正比,总体而言编译耗时基本上在50ms以内。

对于一个JIT函数以及一个参数类型组合,DolphinDB只会编译一次。系统会对JIT函数编译的结果进行缓存。系统根据用户调用一个JIT函数时提供的参数的数据类型得到一个对应的字符串,然后在一个哈希表中寻找这个字符串对应的编译结果,如果存在则直接调用;如果不存在则开始编译,并将编译结果保存到此哈希表中,然后执行。

对需要反复执行的任务,或者运行时间远超编译耗时的任务,JIT会显著提高运行速度。

3.7 局限

目前DolphinDB中JIT适用的场景还比较有限:

  1. 只支持用户自定义函数的JIT。
  2. 只接受scalar和array类型的参数,另外的类型如table, dict,pair, string, symbol等暂不支持。
  3. 不接受subarray作为参数。

4 类型推导

在使用LLVM生成IR之前,必须知道脚本中所有变量的类型,这个步骤就是类型推导。DolphinDB的JIT使用的类型推导方式是局部推导,比如:


  
  1. @jit
  2. def foo() {
  3. x = 1
  4. y = 1.1
  5. z = x + y
  6. return z
  7. }

通过 x = 1 确定x的类型是int;通过 y = 1.1 确定y的类型是 double;通过 z = x + y 以及上面推得的x和y的类型,确定z的类型也是double;通过 return z 确定foo函数的返回类型是double。

如果函数有参数的话,比如:


  
  1. @jit
  2. def foo(x) {
  3. return x + 1
  4. }

foo函数的返回类型就依赖于输入值x的类型。

上面我们提到了目前JIT支持的数据类型,如果函数内部出现了不支持的类型,或者输入的变量类型不支持,那么就会导致整个函数的变量类型推导失败,在运行时会抛出异常。例如:


  
  1. @jit
  2. def foo(x) {
  3. return x + 1
  4. }
  5. foo( 123) // 正常执行
  6. foo( "abc") // 抛出异常,因为目前不支持STRING
  7. foo( 1: 2) // 抛出异常,因为目前不支持pair
  8. foo(( 1 2, 3 4, 5 6)) // 抛出异常,因为目前不支持tuple
  9. @jit
  10. def foo(x) {
  11. y = cumprod(x)
  12. z = y + 1
  13. return z
  14. }
  15. foo( 1. .10) // 抛出异常,因为目前还不支持cumprod函数,不知道该函数返回的类型,导致类型推导失败

因此,为了能够正常使用JIT函数,用户应该避免在函数内或者参数中使用诸如tuple或string等还未支持的类型,不要使用尚不支持的函数。

5 实例

5.1 计算隐含波动率 (implied volatility)

上面提到过某些计算无法进行向量化运算,计算隐含波动率 (implied volatility)就是一个例子:


  
  1. @jit
  2. def GBlackScholes(future_price, strike, input_ttm, risk_rate, b_rate, input_vol, is_call) {
  3. ttm = input_ttm + 0.000000000000001;
  4. vol = input_vol + 0.000000000000001;
  5. d1 = (log(future_price/strike) + (b_rate + vol*vol/2) * ttm) / (vol * sqrt(ttm));
  6. d2 = d1 - vol * sqrt(ttm);
  7. if (is_call) {
  8. return future_price * exp((b_rate - risk_rate) * ttm) * cdfNormal(0, 1, d1) - strike * exp(-risk_rate*ttm) * cdfNormal(0, 1, d2);
  9. } else {
  10. return strike * exp(-risk_rate*ttm) * cdfNormal(0, 1, -d2) - future_price * exp((b_rate - risk_rate) * ttm) * cdfNormal(0, 1, -d1);
  11. }
  12. }
  13. @jit
  14. def ImpliedVolatility(future_price, strike, ttm, risk_rate, b_rate, option_price, is_call) {
  15. high= 5.0;
  16. low = 0.0;
  17. do {
  18. if (GBlackScholes(future_price, strike, ttm, risk_rate, b_rate, (high+low)/2, is_call) > option_price) {
  19. high = (high+low)/2;
  20. } else {
  21. low = (high + low) /2;
  22. }
  23. } while ((high-low) > 0.00001);
  24. return (high + low) /2;
  25. }
  26. @jit
  27. def test_jit(future_price, strike, ttm, risk_rate, b_rate, option_price, is_call) {
  28. n = size(future_price)
  29. ret = array(DOUBLE, n, n)
  30. i = 0
  31. do {
  32. ret[i] = ImpliedVolatility(future_price[i], strike[i], ttm[i], risk_rate[i], b_rate[i], option_price[i], is_call[i])
  33. i += 1
  34. } while(i < n)
  35. return ret
  36. }
  37. n = 100000
  38. future_price= take(rand(10.0,1)[0], n)
  39. strike_price= take(rand(10.0,1)[0], n)
  40. strike= take(rand(10.0,1)[0], n)
  41. input_ttm= take(rand(10.0,1)[0], n)
  42. risk_rate= take(rand(10.0,1)[0], n)
  43. b_rate= take(rand(10.0,1)[0], n)
  44. vol= take(rand(10.0,1)[0], n)
  45. input_vol= take(rand(10.0,1)[0], n)
  46. multi= take(rand(10.0,1)[0], n)
  47. is_call= take(rand(10.0,1)[0], n)
  48. ttm= take(rand(10.0,1)[0], n)
  49. option_price= take(rand(10.0,1)[0], n)
  50. timer(10) test_jit(future_price, strike, ttm, risk_rate, b_rate, option_price, is_call) // 2621.73 ms
  51. timer(10) test_non_jit(future_price, strike, ttm, risk_rate, b_rate, option_price, is_call) // 302714.74 ms

上面的例子中,ImpliedVolatility会调用GBlackScholes函数。函数test_non_jit可通过把test_jit定义之前的@jit去掉以获取。JIT版本test_jit运行速度是非JIT版本test_non_jit的115倍。

5.2 计算 Greeks

量化金融中经常使用Greeks进行风险评估,下面以Charm为例展示JIT的使用:


  
  1. @jit
  2. def myMax(a,b){
  3. if(a>b){
  4. return a
  5. } else{
  6. return b
  7. }
  8. }
  9. @jit
  10. def NormDist( x) {
  11. return cdfNormal( 0, 1, x);
  12. }
  13. @jit
  14. def ND( x) {
  15. return ( 1.0/ sqrt( 2*pi)) * exp(-( x* x)/ 2.0)
  16. }
  17. @jit
  18. def CalculateCharm(future_price, strike_price, input_ttm, risk_rate, b_rate, vol, multi, is_call) {
  19. day_year = 245.0;
  20. d1 = ( log(future_price/strike_price) + (b_rate + (vol*vol)/ 2.0) * input_ttm) / (myMax(vol, 0. 00001) * sqrt(input_ttm));
  21. d2 = d1 - vol * sqrt(input_ttm);
  22. if (is_call) {
  23. return - exp((b_rate - risk_rate) * input_ttm) * (ND(d1) * (b_rate/vol/ sqrt(input_ttm) - d2/ 2.0/input_ttm) + (b_rate-risk_rate) * NormDist(d1)) * future_price * multi / day_year;
  24. } else {
  25. return - exp((b_rate - risk_rate) * input_ttm) * (ND(d1) * (b_rate/vol/ sqrt(input_ttm) - d2/ 2.0/input_ttm) - (b_rate-risk_rate) * NormDist(-d1)) * future_price * multi / day_year;
  26. }
  27. }
  28. @jit
  29. def test_jit(future_price, strike_price, input_ttm, risk_rate, b_rate, vol, multi, is_call) {
  30. n = size(future_price)
  31. ret = array(DOUBLE, n, n)
  32. i = 0
  33. do {
  34. ret[i] = CalculateCharm(future_price[i], strike_price[i], input_ttm[i], risk_rate[i], b_rate[i], vol[i], multi[i], is_call[i])
  35. i += 1
  36. } while(i < n)
  37. return ret
  38. }
  39. def ND_validate( x) {
  40. return ( 1.0/ sqrt( 2*pi)) * exp(-( x* x)/ 2.0)
  41. }
  42. def NormDist_validate( x) {
  43. return cdfNormal( 0, 1, x);
  44. }
  45. def CalculateCharm_vectorized(future_price, strike_price, input_ttm, risk_rate, b_rate, vol, multi, is_call) {
  46. day_year = 245.0;
  47. d1 = ( log(future_price/strike_price) + (b_rate + pow(vol, 2)/ 2.0) * input_ttm) / (max(vol, 0. 00001) * sqrt(input_ttm));
  48. d2 = d1 - vol * sqrt(input_ttm);
  49. return iif(is_call,- exp((b_rate - risk_rate) * input_ttm) * (ND_validate(d1) * (b_rate/vol/ sqrt(input_ttm) - d2/ 2.0/input_ttm) + (b_rate-risk_rate) * NormDist_validate(d1)) * future_price * multi / day_year,- exp((b_rate - risk_rate) * input_ttm) * (ND_validate(d1) * (b_rate/vol/ sqrt(input_ttm) - d2/ 2.0/input_ttm) - (b_rate-risk_rate) * NormDist_validate(-d1)) * future_price * multi / day_year)
  50. }
  51. n = 1000000
  52. future_price= rand( 10.0,n)
  53. strike_price= rand( 10.0,n)
  54. strike= rand( 10.0,n)
  55. input_ttm= rand( 10.0,n)
  56. risk_rate= rand( 10.0,n)
  57. b_rate= rand( 10.0,n)
  58. vol= rand( 10.0,n)
  59. input_vol= rand( 10.0,n)
  60. multi= rand( 10.0,n)
  61. is_call= rand(true false,n)
  62. ttm= rand( 10.0,n)
  63. option_price= rand( 10.0,n)
  64. timer( 10) test_jit(future_price, strike_price, input_ttm, risk_rate, b_rate, vol, multi, is_call) // 1834.342 ms
  65. timer( 10) test_none_jit(future_price, strike_price, input_ttm, risk_rate, b_rate, vol, multi, is_call) // 224099.805 ms
  66. timer( 10) CalculateCharm_vectorized(future_price, strike_price, input_ttm, risk_rate, b_rate, vol, multi, is_call) // 3117.761 ms

上面是一个更加复杂的例子,涉及到更多的函数调用和更复杂的计算,JIT版本比非JIT版本快121倍左右,比向量化版本快0.7倍左右。

5.3 计算止损点 (stoploss)

在这篇知乎专栏中,我们展示了如何使用DolphinDB进行技术信号回测,下面我们用JIT来实现其中的stoploss函数:


  
  1. @jit
  2. def stoploss_JIT(ret, threshold) {
  3. n = ret.size()
  4. i = 0
  5. curRet = 1.0
  6. curMaxRet = 1.0
  7. indicator = take(true, n)
  8. do {
  9. indicator[i] = false
  10. curRet *= (1 + ret[i])
  11. if(curRet > curMaxRet) { curMaxRet = curRet }
  12. drawDown = 1 - curRet / curMaxRet;
  13. if(drawDown >= threshold) {
  14. i = n // break is not supported for now
  15. }
  16. i += 1
  17. } while(i < n)
  18. return indicator
  19. }
  20. def stoploss_no_JIT(ret, threshold) {
  21. n = ret.size()
  22. i = 0
  23. curRet = 1.0
  24. curMaxRet = 1.0
  25. indicator = take(true, n)
  26. do {
  27. indicator[i] = false
  28. curRet *= (1 + ret[i])
  29. if(curRet > curMaxRet) { curMaxRet = curRet }
  30. drawDown = 1 - curRet / curMaxRet;
  31. if(drawDown >= threshold) {
  32. i = n // break is not supported for now
  33. }
  34. i += 1
  35. } while(i < n)
  36. return indicator
  37. }
  38. def stoploss_vectorization(ret, threshold){
  39. cumret = cumprod(1+ret)
  40. drawDown = 1 - cumret / cumret.cummax()
  41. firstCutIndex = at(drawDown >= threshold).first() + 1
  42. indicator = take(false, ret.size())
  43. if(isValid(firstCutIndex) and firstCutIndex < ret.size())
  44. indicator[firstCutIndex: ] = true
  45. return indicator
  46. }
  47. ret = take(0.0008 -0.0008, 1000000)
  48. threshold = 0.10
  49. timer(10) stoploss_JIT(ret, threshold) // 58.674 ms
  50. timer(10) stoploss_no_JIT(ret, threshold) // 14622.142 ms
  51. timer(10) stoploss_vectorization(ret, threshold) // 151.884 ms

stoploss这个函数实际上只需要找到drawdown大于threshold的第一天,不需要把cumprod和cummax全部计算出来,因此用JIT实现的版本比向量化版本快了1.5倍左右,比非JIT版本快248倍左右。

如果数据中最后一天才要stoploss,那么JIT版本的速度会和向量化一样,但是远远比非JIT版本快。

6. 未来

在后续的版本中,我们计划逐步支持以下功能:

  1. 在for, do-while语句中支持break和continue。
  2. 支持dictionary等数据结构,支持string等数据类型。
  3. 支持更多的数学和统计类函数。
  4. 增强类型推导功能,能够识别更多DolphinDB内置函数返回值的数据类型。
  5. 支持在自定义函数中为输入参数,返回值和局部变量声明数据类型。

7 总结

DolphinDB推出了即时编译执行自定义函数的功能,显著提高了for循环,while循环和if-else等语句的运行速度,特别适合于无法使用向量化运算但又对运行速度有极高要求的场景,例如高频因子计算、实时流数据处理等。


转载:https://blog.csdn.net/qq_41996852/article/details/112002998
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场