是否可以在每个元素取决于前一个元素的情况下对递归计算进行递归计算?
-
08-10-2019 - |
题
T(i) = Tm(i) + (T(i-1)-Tm(i))**(-tau(i))
Tm
和 tau
是先前已经计算出的相同长度的数字向量,并且希望创建一个新的向量 T
. 。这 i
仅包括表示所需内容的元素索引。
对于这种情况,是否需要循环?
解决方案
您可能会认为这会起作用:
import numpy as np
n = len(Tm)
t = np.empty(n)
t[0] = 0 # or whatever the initial condition is
t[1:] = Tm[1:] + (t[0:n-1] - Tm[1:])**(-tau[1:])
但事实并非如此:您实际上不能以这种方式以numpy进行递归(因为Numpy计算整个RHS,然后将其分配给LHS)。
因此,除非您可以提出此公式的非恢复版本,否则您会被露骨循环:
tt = np.empty(n)
tt[0] = 0.
for i in range(1,n):
tt[i] = Tm[i] + (tt[i-1] - Tm[i])**(-tau[i])
其他提示
在某些情况下,有可能具有这种递归 - 即如果有递归公式的numpy ufunc,例如
c = numpy.arange(10.)
numpy.add(c[:-1], c[1:], c[1:])
这计算累积总和 c
到位,使用输出参数 numpy.add
. 。它不能写为
c[1:] = c[:-1] + c[1:]
因为现在加法的结果是复制到 c[1:]
计算完成后。
现在最自然的事情是定义自己的ufunc:
def f(T, Tm, tau):
return Tm + (T - Tm)**(-tau)
uf = numpy.frompyfunc(f, 3, 1)
但是由于超出我的原因,以下情况无效:
uf(T[:-1], Tm[1:], tau[1:], T[1:])
显然,结果不是直接写给的 T[1:]
, ,但在操作完成后存放在临时并复制。即使有效,与普通循环相比,我也不会期望从此加快速度,因为它需要为每个条目调用Python功能。
如果您真的想避免使用Python循环,则可能需要去Cython或CTYPES。
我进行了一些基准,并在2018年使用 numba 是人们应该尝试在numpy中加速递归功能的第一个选择(ARONSTEF提案)。 Numba已经预装在Anaconda软件包中,并且具有最快的时间之一(比任何Python快20倍)。在2018年,Python支持@numba注释,没有其他步骤(至少版本3.6和3.7)。这是两个基准:一个在2018-10-20上进行,另一个在2016-05-18中进行。
而且,正如贾夫(Jaffe)提到的那样,在2018年,仍然不可能将递归功能矢量化。我检查了AronStef的矢量化,但它不起作用。
用执行时间排序的基准:
-----------------------------------
|Variant |2018-10 |2016-05 |
-----------------------------------
|Pure C | na | 2.75 ms|
|C extension | na | 6.22 ms|
|Cython float32 | 1.01 ms| na |
|Cython float64 | 1.05 ms| 6.26 ms|
|Fortran f2py | na | 6.78 ms|
|Numba float32 | 2.81 ms| na |
|(Aronstef) | | |
|Numba float64 | 5.28 ms| na |
|Append to list |48.2 ms|91.0 ms|
|Using a.item() |58.3 ms|74.4 ms|
|np.fromiter() |60.0 ms|78.1 ms|
|Loop over Numpy|71.9 ms|87.9 ms|
|(Jaffe) | | |
|Loop over Numpy|74.4 ms| na |
|(Aronstef) | | |
-----------------------------------
答案结束时提供了相应的代码。
我没有在2018年检查纯C,但我想它仍然是基于以前的基准测试的最快。
我也没有在2018年检查C扩展,我认为它与Cython几乎相同,基于先前的基准。
Fortran很难进行调试和编译,因此我没有在2018年检查F2PY版本。而且,这比Cython更糟。
我在2018年有以下设置:
Processor: Intel i7-7500U 2.7GHz
Versions:
Python: 3.7.0
Numba: 0.39.0
Cython: 0.28.5
Numpy: 1.15.1
推荐 numba 使用Float32(来自Aronstef)的代码:
@numba.jit("float32[:](float32[:], float32[:])", nopython=False, nogil=True)
def calc_py_jit32(Tm_, tau_):
tt = np.empty(len(Tm_),dtype="float32")
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])
return tt[1:]
所有其他代码:
数据创建(例如Aronstef + Mike T注释):
np.random.seed(0)
n = 100000
Tm = np.cumsum(np.random.uniform(0.1, 1, size=n).astype('float64'))
tau = np.random.uniform(-1, 0, size=n).astype('float64')
ar = np.column_stack([Tm,tau])
Tm32 = Tm.astype('float32')
tau32 = tau.astype('float32')
Tm_l = list(Tm)
tau_l = list(tau)
2016年的代码略有不同,因为我使用ABS()函数来防止NAN,而不是Mike T的变体。在2018年,该功能与OP(原始海报)写的完全相同。
Cython Float32 使用jupyter %%魔术。该功能可以直接使用 Python
. 。 Cython需要一个C ++编译器,其中编译了Python。合适版本的Visual C ++编译器(用于Windows)的安装可能是有问题的:
%%cython
import cython
import numpy as np
cimport numpy as np
from numpy cimport ndarray
cdef extern from "math.h":
np.float32_t exp(np.float32_t m)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
@cython.initializedcheck(False)
def cy_loop32(np.float32_t[:] Tm,np.float32_t[:] tau,int alen):
cdef np.float32_t[:] T=np.empty(alen, dtype=np.float32)
cdef int i
T[0]=0.0
for i in range(1,alen):
T[i] = Tm[i] + (T[i-1] - Tm[i])**(-tau[i])
return T
Cython Float64 使用jupyter %%魔术。该功能可以直接使用 Python
:
%%cython
cdef extern from "math.h":
double exp(double m)
import cython
import numpy as np
cimport numpy as np
from numpy cimport ndarray
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
@cython.initializedcheck(False)
def cy_loop(double[:] Tm,double[:] tau,int alen):
cdef double[:] T=np.empty(alen)
cdef int i
T[0]=0.0
for i in range(1,alen):
T[i] = Tm[i] + (T[i-1] - Tm[i])**(-tau[i])
return T
numba float64:
@numba.jit("float64[:](float64[:], float64[:])", nopython=False, nogil=True)
def calc_py_jit(Tm_, tau_):
tt = np.empty(len(Tm_),dtype="float64")
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])
return tt[1:]
附加列表. 。最快的非编译解决方案:
def rec_py_loop(Tm,tau,alen):
T = [Tm[0]]
for i in range(1,alen):
T.append(Tm[i] - (T[i-1] + Tm[i])**(-tau[i]))
return np.array(T)
使用a.item():
def rec_numpy_loop_item(Tm_,tau_):
n_ = len(Tm_)
tt=np.empty(n_)
Ti=tt.item
Tis=tt.itemset
Tmi=Tm_.item
taui=tau_.item
Tis(0,Tm_[0])
for i in range(1,n_):
Tis(i,Tmi(i) - (Ti(i-1) + Tmi(i))**(-taui(i)))
return tt[1:]
np.fromiter():
def it(Tm,tau):
T=Tm[0]
i=0
while True:
yield T
i+=1
T=Tm[i] - (T + Tm[i])**(-tau[i])
def rec_numpy_iter(Tm,tau,alen):
return np.fromiter(it(Tm,tau), np.float64, alen)[1:]
在numpy上循环(基于贾夫的想法):
def rec_numpy_loop(Tm,tau,alen):
tt=np.empty(alen)
tt[0]=Tm[0]
for i in range(1,alen):
tt[i] = Tm[i] - (tt[i-1] + Tm[i])**(-tau[i])
return tt[1:]
循环通过numpy(AronStef的代码)。 在我的电脑上 float64
是默认类型 np.empty
.
def calc_py(Tm_, tau_):
tt = np.empty(len(Tm_),dtype="float64")
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = (Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i]))
return tt[1:]
纯c 不使用 Python
根本。 2016年的版本(具有Fabs()函数):
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <windows.h>
#include <sys\timeb.h>
double randn() {
double u = rand();
if (u > 0.5) {
return sqrt(-1.57079632679*log(1.0 - pow(2.0 * u - 1, 2)));
}
else {
return -sqrt(-1.57079632679*log(1.0 - pow(1 - 2.0 * u,2)));
}
}
void rec_pure_c(double *Tm, double *tau, int alen, double *T)
{
for (int i = 1; i < alen; i++)
{
T[i] = Tm[i] + pow(fabs(T[i - 1] - Tm[i]), (-tau[i]));
}
}
int main() {
int N = 100000;
double *Tm= calloc(N, sizeof *Tm);
double *tau = calloc(N, sizeof *tau);
double *T = calloc(N, sizeof *T);
double time = 0;
double sumtime = 0;
for (int i = 0; i < N; i++)
{
Tm[i] = randn();
tau[i] = randn();
}
LARGE_INTEGER StartingTime, EndingTime, ElapsedMicroseconds;
LARGE_INTEGER Frequency;
for (int j = 0; j < 1000; j++)
{
for (int i = 0; i < 3; i++)
{
QueryPerformanceFrequency(&Frequency);
QueryPerformanceCounter(&StartingTime);
rec_pure_c(Tm, tau, N, T);
QueryPerformanceCounter(&EndingTime);
ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart;
ElapsedMicroseconds.QuadPart *= 1000000;
ElapsedMicroseconds.QuadPart /= Frequency.QuadPart;
if (i == 0)
time = (double)ElapsedMicroseconds.QuadPart / 1000;
else {
if (time > (double)ElapsedMicroseconds.QuadPart / 1000)
time = (double)ElapsedMicroseconds.QuadPart / 1000;
}
}
sumtime += time;
}
printf("1000 loops,best of 3: %.3f ms per loop\n",sumtime/1000);
free(Tm);
free(tau);
free(T);
}
Fortran f2py。 功能可以从中使用 Python
. 。 2016年(带有ABS()功能)的版本:
subroutine rec_fortran(tm,tau,alen,result)
integer*8, intent(in) :: alen
real*8, dimension(alen), intent(in) :: tm
real*8, dimension(alen), intent(in) :: tau
real*8, dimension(alen) :: res
real*8, dimension(alen), intent(out) :: result
res(1)=0
do i=2,alen
res(i) = tm(i) + (abs(res(i-1) - tm(i)))**(-tau(i))
end do
result=res
end subroutine rec_fortran
更新:21-10-2018我已经根据评论更正了答案。
只要计算不递归,就可以对向量进行矢量化操作。由于递归操作取决于先前的计算值,因此无法平行处理操作。因此,这行不通:
def calc_vect(Tm_, tau_):
return Tm_[1:] - (Tm_[:-1] + Tm_[1:]) ** (-tau_[1:])
由于(串行处理 /循环)是必要的,因此通过尽可能接近优化的机器代码来获得最佳性能,因此Numba和Cython是这里的最佳答案。
NUMBA方法的实现如下:
init_string = """
from math import pow
import numpy as np
from numba import jit, float32
np.random.seed(0)
n = 100000
Tm = np.cumsum(np.random.uniform(0.1, 1, size=n).astype('float32'))
tau = np.random.uniform(-1, 0, size=n).astype('float32')
def calc_python(Tm_, tau_):
tt = np.empty(len(Tm_))
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - pow(tt[i-1] + Tm_[i], -tau_[i])
return tt
@jit(float32[:](float32[:], float32[:]), nopython=False, nogil=True)
def calc_numba(Tm_, tau_):
tt = np.empty(len(Tm_))
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - pow(tt[i-1] + Tm_[i], -tau_[i])
return tt
"""
import timeit
py_time = timeit.timeit('calc_python(Tm, tau)', init_string, number=100)
numba_time = timeit.timeit('calc_numba(Tm, tau)', init_string, number=100)
print("Python Solution: {}".format(py_time))
print("Numba Soltution: {}".format(numba_time))
Python和NUMBA函数的时间表比较:
Python Solution: 54.58057559299999
Numba Soltution: 1.1389029540000024
为了建立NPE的回答,我同意必须在某个地方有一个循环。也许您的目标是避免与Python循环相关的间接费用?在这种情况下,numpy.fromiter确实击败了一个循环,但只有一点:
使用非常简单的递归关系,
x[i+1] = x[i] + 0.1
我明白了
#FOR LOOP
def loopit(n):
x = [0.0]
for i in range(n-1): x.append(x[-1] + 0.1)
return np.array(x)
#FROMITER
#define an iterator (a better way probably exists -- I'm a novice)
def it():
x = 0.0
while True:
yield x
x += 0.1
#use the iterator with np.fromiter
def fi_it(n):
return np.fromiter(it(), np.float, n)
%timeit -n 100 loopit(100000)
#100 loops, best of 3: 31.7 ms per loop
%timeit -n 100 fi_it(100000)
#100 loops, best of 3: 18.6 ms per loop
有趣的是,预先分配Numpy阵列会导致绩效巨大损失。这对我来说是一个谜,尽管我猜想,与访问阵列元素相比,与附加列表相比,必须有更多的间接费用。
def loopit(n):
x = np.zeros(n)
for i in range(n-1): x[i+1] = x[i] + 0.1
return x
%timeit -n 100 loopit(100000)
#100 loops, best of 3: 50.1 ms per loop