您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

利用“写时复制”功能将数据复制到Multiprocessing.Pool()工作进程

利用“写时复制”功能将数据复制到Multiprocessing.Pool()工作进程

发送到pool.map(以及相关方法)的所有内容实际上都没有使用共享的写时复制资源。这些值被“ pickled”(Python的序列化机制),通过管道发送到工作进程,然后在其中取消pick,从而从头开始在子进程中重建对象。因此,在这种情况下,每个孩子最终都会得到原始数据的写时复制版本(它从未使用过,因为被告知要使用通过IPC发送的副本),并且个人重新获得了原始数据。在孩子中重建,不共享。

如果要利用分叉的写时复制优势,则不能通过管道发送数据(或引用数据的对象)。您必须将它们存储在可以通过访问其自己的全局变量从孩子那里找到的位置。因此,例如:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = list(range(10))  # Don't pass self
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(i):
    # Implicitly use global copy of my_instance, not one passed as an argument
    return my_instance.analyze(i)

# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()


if __name__ == '__main__':
    my_instance.my_multithreaded_analysis()
@H_502_9@

通过不传递self,可以避免进行复制,而仅使用写时复制映射到子对象的单个全局对象。如果需要多个对象,则可以在创建池之前对对象的实例进行全局listdict映射,然后将可以将对象作为参数的一部分的索引或键传递给pool.map。然后,worker函数使用索引/键(必须通过IPC对其进行腌制并将其发送给子代)在全局dict(还包括写时复制映射)中查找值(写时复制映射),因此,您可以复制便宜的信息以在孩子中查找昂贵的数据而不进行复制。

解决方法

我有multiprocessing一些看起来像这样的Python代码:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = [(self,i) for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(call_method,arg_lists)
        print result

    def analyze(self,i):
        time.sleep(10)
        return i ** 2

def call_method(args):
    my_instance,i = args
    return my_instance.analyze(i)


if __name__ == '__main__':
    my_instance = MyClass()
    my_instance.my_multithreaded_analysis()

在阅读了有关内存在其他StackOverflow答案中的工作方式的答案(例如该Python多处理内存使用情况)后,我的印象是,这种内存使用方式与我用于多处理的进程数量成比例,因为它是写时复制和我尚未修改的任何属性my_instance。但是,运行顶部时,我确实会看到所有进程的高内存,它表示我的大多数进程正在使用大量内存(这是OSX的最高输出,但是我可以在Linux上复制)。

我的问题基本上是,我的解释MyClass实际上在整个池中重复了吗,我是否正确地解释了这一点?如果是这样,我该如何预防呢?我应该不使用这样的结构吗?我的目标是减少用于计算分析的内存使用量。

PID   COMMAND      %CPU  TIME     #TH    #WQ  #PORT MEM    PURG   CMPRS  PGRP PPID STATE
2494  Python       0.0   00:01.75 1      0    7     765M   0B     0B     2484 2484 sleeping
2493  Python       0.0   00:01.85 1      0    7     765M   0B     0B     2484 2484 sleeping
2492  Python       0.0   00:01.86 1      0    7     765M   0B     0B     2484 2484 sleeping
2491  Python       0.0   00:01.83 1      0    7     765M   0B     0B     2484 2484 sleeping
2490  Python       0.0   00:01.87 1      0    7     765M   0B     0B     2484 2484 sleeping
2489  Python       0.0   00:01.79 1      0    7     167M   0B     597M   2484 2484 sleeping
2488  Python       0.0   00:01.77 1      0    7     10M    0B     755M   2484 2484 sleeping
2487  Python       0.0   00:01.75 1      0    7     8724K  0B     756M   2484 2484 sleeping
2486  Python       0.0   00:01.78 1      0    7     9968K  0B     755M   2484 2484 sleeping
2485  Python       0.0   00:01.74 1      0    7     171M   0B     594M   2484 2484 sleeping
2484  Python       0.1   00:16.43 4      0    18    775M   0B     12K    2484 2235 sleeping
喜欢与人分享编程技术与工作经验,欢迎加入编程之家官方交流群!
import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = [(self,i) for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(call_method,arg_lists)
        print result

    def analyze(self,i):
        time.sleep(10)
        return i ** 2

def call_method(args):
    my_instance,i = args
    return my_instance.analyze(i)


if __name__ == '__main__':
    my_instance = MyClass()
    my_instance.my_multithreaded_analysis()
PID   COMMAND      %CPU  TIME     #TH    #WQ  #PORT MEM    PURG   CMPRS  PGRP PPID STATE
2494  Python       0.0   00:01.75 1      0    7     765M   0B     0B     2484 2484 sleeping
2493  Python       0.0   00:01.85 1      0    7     765M   0B     0B     2484 2484 sleeping
2492  Python       0.0   00:01.86 1      0    7     765M   0B     0B     2484 2484 sleeping
2491  Python       0.0   00:01.83 1      0    7     765M   0B     0B     2484 2484 sleeping
2490  Python       0.0   00:01.87 1      0    7     765M   0B     0B     2484 2484 sleeping
2489  Python       0.0   00:01.79 1      0    7     167M   0B     597M   2484 2484 sleeping
2488  Python       0.0   00:01.77 1      0    7     10M    0B     755M   2484 2484 sleeping
2487  Python       0.0   00:01.75 1      0    7     8724K  0B     756M   2484 2484 sleeping
2486  Python       0.0   00:01.78 1      0    7     9968K  0B     755M   2484 2484 sleeping
2485  Python       0.0   00:01.74 1      0    7     171M   0B     594M   2484 2484 sleeping
2484  Python       0.1   00:16.43 4      0    18    775M   0B     12K    2484 2235 sleeping

通过不传递self,可以避免进行复制,而仅使用写时复制映射到子对象的单个全局对象。如果需要多个对象,则可以在创建池之前对对象的实例进行全局listdict映射,然后将可以将对象作为参数的一部分的索引或键传递给pool.map。然后,worker函数使用索引/键(必须通过IPC对其进行腌制并将其发送给子代)在全局dict(还包括写时复制映射)中查找值(写时复制映射),因此,您可以复制便宜的信息以在孩子中查找昂贵的数据而不进行复制。

我有multiprocessing一些看起来像这样的Python代码:

在阅读了有关内存在其他StackOverflow答案中的工作方式的答案(例如该Python多处理内存使用情况)后,我的印象是,这种内存使用方式与我用于多处理的进程数量成比例,因为它是写时复制和我尚未修改的任何属性my_instance。但是,运行顶部时,我确实会看到所有进程的高内存,它表示我的大多数进程正在使用大量内存(这是OSX的最高输出,但是我可以在Linux上复制)。

我的问题基本上是,我的解释MyClass实际上在整个池中重复了吗,我是否正确地解释了这一点?如果是这样,我该如何预防呢?我应该不使用这样的结构吗?我的目标是减少用于计算分析的内存使用量。

通过不传递self,可以避免进行复制,而仅使用写时复制映射到子对象的单个全局对象。如果需要多个对象,则可以在创建池之前对对象的实例进行全局listdict映射,然后将可以将对象作为参数的一部分的索引或键传递给pool.map。然后,worker函数使用索引/键(必须通过IPC对其进行腌制并将其发送给子代)在全局dict(还包括写时复制映射)中查找值(写时复制映射),因此,您可以复制便宜的信息以在孩子中查找昂贵的数据而不进行复制。

其他 2022/1/1 18:35:35 有468人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶