admin管理员组

文章数量:1419223

We're working on a concurrency problem where a single "Sample" object has multiple dependent tasks that must be executed in stages. For instance, stage 1 has tasks (1a, 1b), stage 2 has tasks (2a, 2b), and so on. Each stage can only begin after all tasks in the previous stage complete.

When running in a single thread, we rely on the mutability of the Sample and its child objects to keep track of which tasks have finished—i.e., if 1a and 1b are marked as complete, then we trigger stage 2. However, in a multi-processing context, these references get pickled and passed to each worker. That means each task operates on a copy of the Sample rather than a shared mutable reference. Once the tasks complete, we're left with multiple copies whose state we have to reconcile manually.

I'd like to know:

  • Best practices for orchestrating dependent tasks so that when all tasks in stage 1 are finished, I can start stage 2 without losing track of what’s done.
  • How to avoid the “lost mutability” problem,where each process modifies a copy and I need to merge them back. Are there recommended patterns or data structures (like multiprocessing.Manager or some form of shared memory) that make this simpler?
  • How to handle the scenario where each task modifies the same sample object but we only want final, aggregated results in one place.

Below is a simplified code example. In real code, each task modifies the Sample's internal data, but as soon as we use ProcessPoolExecutor, the Sample object’s references become disconnected copies.

import concurrent.futures

class Sample:
    def __init__(self, sample_id):
        self.sample_id = sample_id
        # For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...}
        self.stage_completion = {
            '1': [False, False],
            '2': [False, False],
            '3': [False, False]
        }
    
    def do_task(self, stage, sub_idx):
        # Do some work here
        print(f"Doing {stage}{sub_idx} for sample {self.sample_id}")
        self.stage_completion[stage][sub_idx] = True
        return self  # Return self for convenience

def run_task(sample_obj, stage, sub_idx):
    return sample_obj.do_task(stage, sub_idx)

def main():
    sample = Sample(sample_id=123)

    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1])
        future1 = executor.submit(run_task, sample, '1', 0)
        future2 = executor.submit(run_task, sample, '1', 1)
        
        # Wait for them to finish
        result1 = future1.result()
        result2 = future2.result()
        
        # Now I'd like to check if stage 1 is fully done before scheduling stage 2
        # But result1 and result2 are separate copies with their own state
        # This is where merging states or having a centralized tracking is tricky
        print("Stage 1 results from result1:", result1.stage_completion)
        print("Stage 1 results from result2:", result2.stage_completion)

if __name__ == "__main__":
    main()

As you can see, each returned Sample object might have a partial view of the overall state. I'd prefer a solution that keeps them in sync or merges them easily, without resorting to writing manual “merge functions” for every internal data structure.

What are the recommended design patterns or approaches in Python for managing (and ultimately reconciling) mutable state across parallel tasks so that I can coordinate dependent tasks without losing the shared object’s unified state? Tips, examples using multiprocessing, concurrent.futures, or a more appropriate library would be much appreciated.

We'd guess the easiest way is to store the objects in a separate database - but then all the calls to that database may make it slow...

We're working on a concurrency problem where a single "Sample" object has multiple dependent tasks that must be executed in stages. For instance, stage 1 has tasks (1a, 1b), stage 2 has tasks (2a, 2b), and so on. Each stage can only begin after all tasks in the previous stage complete.

When running in a single thread, we rely on the mutability of the Sample and its child objects to keep track of which tasks have finished—i.e., if 1a and 1b are marked as complete, then we trigger stage 2. However, in a multi-processing context, these references get pickled and passed to each worker. That means each task operates on a copy of the Sample rather than a shared mutable reference. Once the tasks complete, we're left with multiple copies whose state we have to reconcile manually.

I'd like to know:

  • Best practices for orchestrating dependent tasks so that when all tasks in stage 1 are finished, I can start stage 2 without losing track of what’s done.
  • How to avoid the “lost mutability” problem,where each process modifies a copy and I need to merge them back. Are there recommended patterns or data structures (like multiprocessing.Manager or some form of shared memory) that make this simpler?
  • How to handle the scenario where each task modifies the same sample object but we only want final, aggregated results in one place.

Below is a simplified code example. In real code, each task modifies the Sample's internal data, but as soon as we use ProcessPoolExecutor, the Sample object’s references become disconnected copies.

import concurrent.futures

class Sample:
    def __init__(self, sample_id):
        self.sample_id = sample_id
        # For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...}
        self.stage_completion = {
            '1': [False, False],
            '2': [False, False],
            '3': [False, False]
        }
    
    def do_task(self, stage, sub_idx):
        # Do some work here
        print(f"Doing {stage}{sub_idx} for sample {self.sample_id}")
        self.stage_completion[stage][sub_idx] = True
        return self  # Return self for convenience

def run_task(sample_obj, stage, sub_idx):
    return sample_obj.do_task(stage, sub_idx)

def main():
    sample = Sample(sample_id=123)

    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1])
        future1 = executor.submit(run_task, sample, '1', 0)
        future2 = executor.submit(run_task, sample, '1', 1)
        
        # Wait for them to finish
        result1 = future1.result()
        result2 = future2.result()
        
        # Now I'd like to check if stage 1 is fully done before scheduling stage 2
        # But result1 and result2 are separate copies with their own state
        # This is where merging states or having a centralized tracking is tricky
        print("Stage 1 results from result1:", result1.stage_completion)
        print("Stage 1 results from result2:", result2.stage_completion)

if __name__ == "__main__":
    main()

As you can see, each returned Sample object might have a partial view of the overall state. I'd prefer a solution that keeps them in sync or merges them easily, without resorting to writing manual “merge functions” for every internal data structure.

What are the recommended design patterns or approaches in Python for managing (and ultimately reconciling) mutable state across parallel tasks so that I can coordinate dependent tasks without losing the shared object’s unified state? Tips, examples using multiprocessing, concurrent.futures, or a more appropriate library would be much appreciated.

We'd guess the easiest way is to store the objects in a separate database - but then all the calls to that database may make it slow...

Share Improve this question asked Jan 29 at 11:28 NanoNerdNanoNerd 1322 silver badges11 bronze badges 3
  • Without knowing what other types of attributes are in your actual Sample instance, it's difficult to give a definitive answer as to what approach you should take. You need to update your question. – Booboo Commented Jan 30 at 13:26
  • In the code you posted you are submitting two tasks for the first stage and then waiting for them to complete. At this point by definition stage 1 has completed and you can submit tasks for the next stage. So it isn't clear to me why you even have a stage_completion attribute. – Booboo Commented Jan 30 at 13:35
  • You might want to look into the multiprocessing.shared_memory package in the standard library. – Paul Cornelius Commented Feb 3 at 4:32
Add a comment  | 

1 Answer 1

Reset to default 0

you can have a shared python object using multiprocessing.Manager.

the limitation is that because you are using a proxy, you will need to add getters and setters for all subobjects.

import concurrent.futures
import multiprocessing.managers
import copy

class SharedSample:
    def __init__(self, sample_id):
        self.sample_id = sample_id
        # For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...}
        self.stage_completion = {
            '1': [False, False],
            '2': [False, False],
            '3': [False, False]
        }

    def get_sample_id(self):
        return self.sample_id
    def set_stage_completion(self, stage, subidx, value):
        self.stage_completion[stage][subidx] = value

# register the type to the manager
multiprocessing.managers.BaseManager.register("SharedSample",SharedSample)

# this cannot be a member method of SharedSample
# otherwise it will run in the manager
def do_task(sample, stage, sub_idx):
    # Do some work here
    print(f"Doing {stage}{sub_idx} for sample {sample.get_sample_id()}")
    sample.set_stage_completion(stage,sub_idx,True)
    return sample  # Return self for convenience


def run_task(sample_obj, stage, sub_idx):
    return do_task(sample_obj, stage, sub_idx)


def main():
    with multiprocessing.managers.BaseManager() as sampleManager:
        sample = sampleManager.SharedSample(sample_id=123)

        with concurrent.futures.ProcessPoolExecutor() as executor:
            # Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1])
            future1 = executor.submit(run_task, sample, '1', 0)
            future2 = executor.submit(run_task, sample, '1', 1)

            # Wait for them to finish
            result1 = future1.result()
            result2 = future2.result()

            # deepcopy copies object from manager to here
            print("Stage 1 results from result1:", copy.deepcopy(result1).stage_completion)
            print("Stage 1 results from result2:", copy.deepcopy(result2).stage_completion)


if __name__ == "__main__":
    main()
Doing 10 for sample 123
Doing 11 for sample 123
Stage 1 results from result1: {'1': [True, True], '2': [False, False], '3': [False, False]}
Stage 1 results from result2: {'1': [True, True], '2': [False, False], '3': [False, False]}

getters return a copy, don't add a getter for stage_completion, but if you do then call it get_stage_completion_deepcopy() , so the future maintainer won't try to assign to it.


this could also be done using multiprocessing.Manager.dict and multiprocessing.manager.list, but they are tricky to use because any subobject is deepcopied, and they almost always result in bugs by people treating them like a normal dict/list when they are not, and they end up adding a lot of latency to the program, especially when you nest them.

本文标签: