Parallelisation and Python
So, lets build our example model. First, how would we build it without the parallelization? Here's some code: model.py, agent.py, landscape.py.
Look through it and try and understand what it is doing. We've documented the code to help. Start with the class-level comments for model.py.
The Landscape only really keeps and calculates a map of the agent densities, but it does also give us a framework for a more sophisticated landscape than just a set of arrays, should we wish to expand the model. At the moment the model runs for a fixed number of iterations. As we have agents that move from high-density areas with the idea that they evenly distribute, you could obviously replace the run and report sections in model.py with something that calculates the statistical difference between an even population and the current result stored in Landscape, and uses it in a stopping rule. Note that the density is in a 1D array, mainly because MPI send/recv commands work best with 1D arrays, and we wanted to be as MPI compliant as possible.
Run the code and see what it does. We'll now parallelise it. There's no need to edit the code – we'll give you a full copy at the end – just spend the time trying to understand the changes.
To parallelise the code, we need to think about two types of node (cores/processors, PCs, virtual machines, etc.): the node we want to see the results on (we'll call this node zero), and all the other nodes (which we'll call "worker nodes", and which are numbered one upwards). We want to get different things to happen on different nodes. The key thing to understand is that each node gets its own copy of the code (or a section of the code), but that it is all of that code: an identical copy, not just the bit that runs on that node. We control which bits of the code run on which node by finding out the number of the node the code is actually running on and, for example, using "if" statements to say that some things should only happen if the code is running on node zero.
In addition, we're going to restructure our code so we've divided off the chunk that will be processed on any given node from the main program. Our model.py code will look like this:
class Model :
def task(self, node, number_of_nodes, pipes):
if statements to set up the nodes with agents
loop iterations to move the agents
def main(self):
Make various pipes to communicate between nodes
start each node with the pipes and get it to do task
if __name__ == "__main__":
Model().main()
Pipes are what we'll use to send and recieve data between the nodes and node zero.
The first thing we need to know is how many nodes we have. We're going to set this to the same as the number of cores on our machine. This will allow us to load balance the right number of agents across the nodes. The multiprocessing library supplies a way of finding out the number of cores in total. We need to make the following adjustment to the Model class:
import multiprocessing
public class Model {
main():
number_of_nodes = multiprocessing.cpu_count()
This imports the multiprocessing module, and finds out the number of CPU cores. Once we know the number of cores we're dealing with, we can make nodes, and pipes to connect the nodes to node zero, passing each node a copy of the pipes. Here's our "main" method:
def main(self):
number_of_nodes = multiprocessing.cpu_count()
processes = []
parent_pipes = []
child_pipes = []
# Make the communication pipes.
# Note that pipes have two ends (parent connection and child).
# We want node zero to
# have one end, and each note to have another.
for i in range(1, number_of_nodes):
parent_conn, child_conn = multiprocessing.Pipe()
parent_pipes.append(parent_conn)
child_pipes.append(child_conn)
# Give node zero one end of the pipes and start it processing.
p0 = multiprocessing.Process(group=None, target=self.task, name=None, args=(0, number_of_nodes, parent_pipes), kwargs={}, daemon=None)
processes.append(p0)
p0.start()
# Give the other nodes the other ends, and start them processing.
for i in range(1, number_of_nodes):
p = multiprocessing.Process(group=None, target=self.task, name=None, args=(i, number_of_nodes, child_pipes), kwargs={}, daemon=None)
processes.append(p)
p.start()
# Wait for all processes to finish before exiting.
for p in processes:
p.join()
The key piece of code is:
p0 = multiprocessing.Process(group=None, target=self.task, name=None, args=(0, number_of_nodes, parent_pipes), kwargs={}, daemon=None)
The chief piece of which is invoking the code to run on each node (target=self.task
) and
passing it arguments, in this case including the relevant pipe ends (args=(0, number_of_nodes, parent_pipes)
). If you look at the
code, you'll see we separately make node zero and the worker nodes, passing the parent ends of the pipes to node zero, and the child
ends to the worker nodes.
We can now make our task
code node specific. For example, telling it to report
only on node zero. Remember that every node will get a copy of the task code (indeed, on multiple machines,
every machine would get a copy of the whole code):
// Report
if (node == 0):
print("time = ", time, " -------------------")
landscape.setdensities(densities);
for x in range(width):
for y in range (height):
print(landscape.getdensityat(x,y), end=" ")
print("")
Even though the task function will run on all nodes, the the Report section only runs on node zero. Neat hu?
Next we'll divide up the agents between the nodes.