![]() ![]() If you remove process-safety by commenting the corresponding _iadd_ or _isub_ of Decorator then the program will only terminate by chance (e.g. The program is guaranteed to terminate when += and -= are process-safe. Setattr(subject, args, getattr(subject, args) - args) Setattr(subject, args, getattr(subject, args) + args) Response_queues.put(getattr(subject, args)) Print(sender, 'requested', action, *args) ![]() Sender, action, *args = request_queue.get() Target=worker, args=(proxy,), name=sender) Proxy = Proxy(request_queue, response_queues) Response_queues = multiprocessing.Queue() Then, we define the function worker that will be run in the child processes and request the increment and decrement operations: def worker(proxy):įinally, we define a single request queue to send requests to the parent process, and multiple response queues to send responses to the child processes: if _name_ = '_main_': proxy.x += value is equivalent to proxy.x = proxy.x._iadd_(value) which is equivalent to proxy.x = type(proxy).x._get_(proxy)._iadd_(value) which is equivalent to type(proxy).x._set_(proxy, type(proxy).x._get_(proxy)._iadd_(value))): class Decorator(int): The increment and decrement operators += and -= call the corresponding augmented assignment special methods _iadd_ and _isub_ if they are defined, and fall back on the assignment special methods _add_ and _sub_ which are always defined for int objects (e.g. Then, we define the class Decorator to decorate the int objects returned by the getters of a Proxy object in order to inform its setters whether the increment or decrement operators += and -= have been used by adding an action attribute, in which case the setters request an 'increment' or 'decrement' operation instead of a 'set' operation. Self._request_queue.put((sender, action, target, x(self): Return Decorator(self._response_queue.get()) Self._request_queue.put((sender, 'get', target)) Sender = multiprocessing.current_process().name Responses are of the form value (to 'get' requests): class Proxy(Subject):ĭef _init_(self, request_queue, response_queue): Requests are of the form (sender, action, *args) where sender is the sender name, action is the action name ( 'get', 'set', 'increment', or 'decrement' the value of an attribute), and args is the argument tuple. The interprocess communication will use two multiprocessing.Queue attributes, one for exchanging requests and one for exchanging responses. Next, we define a class Proxy for instantiating an object that will be the remote proxy through which the child processes will request the parent process to retrieve or update the attributes of the Subject object. It uses message passing with multiprocessing.Queue objects (instead of shared memory with multiprocessing.Value objects) and process-safe (atomic) built-in increment and decrement operators += and -= (instead of introducing custom increment and decrement methods) since you asked for it.įirst, we define a class Subject for instantiating an object that will be local to the parent process and whose attributes are to be incremented or decremented: import multiprocessing Here is a solution to your problem based on a different approach from that proposed in the other answers. I = p.map_async(analyze_data, inputs, chunksize = 1) P = Pool(initializer = init, initargs = (counter, )) # create the pool of workers, ensuring each one receives the counter # initialize a cross-process counter and the input lists # += operation is not atomic, so we need to get a lock: ''' increment the global counter, do something with the input ''' Note it uses global values which I would really try to avoid in practice: from multiprocessing import Pool, Value ![]() Here's a working version of your example (with some dummy input data). In your case you might want to share a Value instance between your workers ![]() See this section of the documentation for some techniques you can employ to share state between your processes. The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that. ![]()
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |