|
At its simplest, message passing (communication) comprises one process that is designed to send out a message, and another that is designed to receives it. Message passing using JavaSpaces is described as being loosely coupled in that senders and receivers communicate indirectly through the space. (The opposite of loose coupling is tight coupling.)
Channels (or streams) are "information conduits" that take a series of messages.
Synchronsisation is, of course, important when dealing with distributed systems that use some limitted but shared number of units of resource. Processes requiring these resources use these units if they are availble, if not such processes should wait till an appropriate unit becomes avaialble.
Semaphores are a mechansim for controlling units of resource. A semaphore, at its simplest, is an integer counter that represents the count of available units of resource. It is controlled by two operations:
A disadvantage of semaphores is that some processes, waiting for a resource, may be significantly less efficient at obtaining resources than others. In the worst case a process may entirely fail to obtain any resources --- a problem known as startvation. Two possible solutions to the startvation problem are:
A barrier is a point in a distributed computation which eavh process must reach prior to continuing on to the next phase of the computation. Typically a barrier comprises a shared variable which must equal the total number of processes before the next phase can be commenced. The variable is initialised with the value 0, as each process completes the first phase the varibale is incremebted.
When an entry is written into space it is serialized, i.e. it is turned into a series of bytes. Similarly when an entry is read from a space it is deserialized. There is thus a serialization overhead associated with writing, reading and taking entries.
If we are repeatedly reading the same entry then we can use the snapshot method to reduce this overhead. Given the HelloWorld entey shown in Table 1, we can write an application class as shown in Table 2.
We will illustrate the above by creating an application vomprising a Master process and a number of Client processes. Each process will process some data. When complete the Clients will send their processed data into a space where it can be collated by the Master process.
The Client processes will start processing when they receive a signal to do so from the Master process. This will be encoded as a Message entry of the form described earlier in the HelloWorld and the extended HelloWorld programmes. The Mesage class is presented in Table 1.
import net.jini.core.entry.*; /* ----------------------------------------- */ /* */ /* MESSAGE CLASS */ /* */ /* ----------------------------------------- */ public class Message implements Entry { // Fields public String content; // Constructors public Message() { } public Message(String content) { this.content = content; } } |
Table 1: Message entry class
The Master process will also create a synchronisation array (one element per Client process) with all elements set to 0. When each Client completes processing and sending its data it will take the synchronsistaion element whose index corresponds with its Client number out of the space, replace the value with 1 signalling completion, and send the element back into the space. When the entire synchronisation array is "peopled" with 1s the Master process will read the data entries from the space and collate with its own entries. The sysnchronisastion array will be encoded as a distributed array structure as described in the distributed array application example. The Element class used is pesented in table 2.
import net.jini.core.entry.*; /* ------------------------------------------------ */ /* */ /* ARRAY ELEMENT CLASS */ /* */ /* ------------------------------------------------ */ public class Element implements Entry { /* --- FIELDS --- */ public String name; public Integer index; public Integer value; /* --- CONSTRUCTORS --- */ public Element() { } public Element(String name) { this.name = name; } public Element(String name, int index) { this.name = name; this.index = new Integer(index); } public Element(String name, int index, int value) { this.name = name; this.index = new Integer(index); this.value = new Integer(value); } } |
Table 2: Element entry class
The Master and Client processes are described by two sub-classes, MasterProcess and ClientProcess, of a common parent class (Process). These three classes are presented in Tables 3, 4 and 5.
/* ------------------------------------------- */ /* */ /* PROCESS */ /* */ /* ------------------------------------------- */ // JavaSpacesUtil package import JavaSpacesUtils.SpaceAccessor; // Jini core packages import net.jini.core.lease.*; // Jini extension package import net.jini.space.JavaSpace; public class Process { /* ------ FIELDS ------ */ protected JavaSpace space; protected int[] dataArray = new int[10]; /* ------ COMSTRUCTORS ------ */ public Process() { // Get JavaSpace try { SpaceAccessor newSpaceAccessor = new SpaceAccessor("/home/staff5/ra/frans/JavaProgs/" + "JavaSpaces/JavaSpacesUtils/frans_space.prop"); space = newSpaceAccessor.getSpace(); } // Catch block catch(Exception e) { e.printStackTrace(); } } /* ------ METHODS ------ */ /* PROCESS DATA */ protected void processData() { System.out.println("Processing local data"); // Loop for(int index=0;index < dataArray.length;index++) dataArray[index] = index; } /* OUTPUT DATA */ protected void outputData() { // Loop for(int index=0;index < dataArray.length;index++) { System.out.println("(" + index + ") = " + dataArray[index]); } } } |
Table 3: Process class
/* ------------------------------------------- */ /* */ /* MASTER PROCESS */ /* */ /* ------------------------------------------- */ // Java packages import java.util.*; import java.rmi.*; // JavaSpacesUtil package import JavaSpacesUtils.SpaceAccessor; // Jini core packages import net.jini.core.lease.*; import net.jini.core.transaction.*; import net.jini.core.entry.*; // Jini extension package import net.jini.space.JavaSpace; public class MasterProcess extends Process { /* ------ FIELDS ------ */ private int numClients; /* ------ COMSTRUCTORS ------ */ /* Also calls parent constructor */ public MasterProcess(int clients) { numClients = clients; } /* ------ METHODS ------ */ /* START MASTER PROCESS */ public void startMasterProcess() { System.out.println("MASTER PROCESS STARTED"); // try block try { // Tell client processes to start sendStartMessage(); // Send syncro array sendSynchroArray(); // Process data processData(); outputData(); // Collect data fromClients collectData(); // Output data outputData(); // End System.out.println("Clear up"); clearUp(); System.out.println("MASTER PROCESS COMPLETE"); System.exit(0); } // Catch block catch(Exception e) { e.printStackTrace(); } } /* SEND START MESSAGE */ private void sendStartMessage() throws TransactionException, RemoteException { // Create Message object entry Message msg = new Message("Start"); // Send message to space space.write(msg,null,Lease.FOREVER); } /* SEND SYNCHRO ARRAY */ private void sendSynchroArray() throws TransactionException, RemoteException { System.out.println("Send synchronisation"); // Create synchro elements for (int index=0;index < numClients;index++) { Element newElement = new Element("Synchro",index,0); space.write(newElement,null,Lease.FOREVER); } } /* COLLECT DATA: Collect data from clients */ private void collectData() throws UnusableEntryException,RemoteException, TransactionException,InterruptedException { int index; int value; System.out.println("Collecting data from clients"); // Create a template Element template = new Element("data"); // Loop while(true) { // Read entry Element result = (Element) space.takeIfExists(template,null, JavaSpace.NO_WAIT); if (result == null) { if (checkForEnd()) { System.out.println("No more data"); break; } else Thread.sleep(1000); // Delay 1 second } else { index = result.index.intValue(); value = result.value.intValue(); dataArray[index] = dataArray[index] + value; } } System.out.println("Collection Complete"); } /* CHECK FOR END */ private boolean checkForEnd() throws UnusableEntryException,RemoteException, TransactionException,InterruptedException { for(int index=0;index < numClients;index++) { Element template = new Element("Synchro",index,1); Element result = (Element) space.readIfExists(template,null, space.NO_WAIT); if (result == null) return(false); } // End return(true); } /* CLEAR UP: Remove unwanted entries from space */ private void clearUp() throws UnusableEntryException,RemoteException, TransactionException,InterruptedException { // Remove message entry Message template1 = new Message("Start"); space.takeIfExists(template1,null,space.NO_WAIT); // Remove synchro array for (int index=0;index < numClients;index++) { Element template2 = new Element("Synchro",index); space.takeIfExists(template2,null,space.NO_WAIT); } } } |
Table 4: MasterProcess class
/* ------------------------------------------- */ /* */ /* CLIENT PROCESS */ /* */ /* ------------------------------------------- */ // Java packages import java.rmi.*; // JavaSpacesUtil package import JavaSpacesUtils.SpaceAccessor; // Jini core packages import net.jini.core.lease.*; import net.jini.core.transaction.*; import net.jini.core.entry.*; // Jini extension package import net.jini.space.JavaSpace; public class ClientProcess extends Process { /* ------ FIELDS ------ */ private int clientNum; /* ------ COMSTRUCTORS ------ */ /* Also calls parent constructor */ public ClientProcess(int clientN) { clientNum = clientN-1; // First index is 0 not 1! } /* ------ METHODS ------ */ /* MAIN */ public void startClientProcess() { // try block try { // Wait for Master getStartMessage(); System.out.println("CLIENT PROCESS " + (clientNum+1) + " STARTED"); // Process data processData(); // Output data outputData(); // Send data sendData(); sendEnd(); // End System.out.println("CLIENT PROCESS " + (clientNum+1) + " COMPLETE"); System.exit(0); } // Catch block catch(Exception e) { e.printStackTrace(); } } /* GET START MESSAGE */ private void getStartMessage() throws UnusableEntryException, RemoteException,TransactionException, InterruptedException { // Create Message template Message template = new Message("Start"); // Get message Message result = (Message) space.read(template,null, Long.MAX_VALUE); } /* SEND DATA */ private void sendData() throws TransactionException,RemoteException { Element newElement; System.out.println("Send data"); // Loop for(int index=0;index < dataArray.length;index++) { // Create element newElement = new Element("data",index,dataArray[index]); // Write element space.write(newElement,null,Lease.FOREVER); } } /* SEND END: Get element of "Synchro" array that represents the current process, change its value to 1 and resent into space. */ private void sendEnd() throws UnusableEntryException,RemoteException, TransactionException,InterruptedException { // Create template Element template = new Element("Synchro",clientNum); // Take element Element result = (Element) space.take(template,null,Long.MAX_VALUE); // Revise value to 1 result.value = new Integer(1); // Send back space.write(result,null,Lease.FOREVER); } } |
Table 5: ClientProcess class
We will also need Master and Client application classes, these are presented in Tables 6 and 7.
/* ------------------------------------------- */ /* */ /* MASTER PROCESS APPLICATION */ /* */ /* ------------------------------------------- */ public class MasterProcessApp { /* ------ METHODS ------ */ public static void main(String[] args) { int numClients = Integer.parseInt(args[0]); // Create instance of MasterProcess MasterProcess newMasterProcess = new MasterProcess(numClients); // Commence master process newMasterProcess.startMasterProcess(); } } |
Table 6: MasterProcessApp class
/* ------------------------------------------- */ /* */ /* CLIENT PROCESS APPLICATION */ /* */ /* ------------------------------------------- */ public class ClientProcessApp { /* ------ METHODS ------ */ public static void main(String[] args) { int clientNum = Integer.parseInt(args[0]); // Create instance of ClientProcess ClientProcess newClientProcess = new ClientProcess(clientNum); // Commence client process newClientProcess.startClientProcess(); } } |
Table 7: ClientProcessApp class
To run the above we should start the client processes before the Master process. Tables 8, 9 and 10 show the output from running two Client process and a Master process.
$ ./javaRun ClientProcessApp 1 jiniURL = jini://linux10 spaceName = frans_space CLIENT PROCESS 1 STARTED Processing local data (0) = 0 (1) = 1 (2) = 2 (3) = 3 (4) = 4 (5) = 5 (6) = 6 (7) = 7 (8) = 8 (9) = 9 Send data CLIENT PROCESS 1 COMPLETE |
Table 8: Output produced by Client process 1
$ ./javaRun ClientProcessApp 2 jiniURL = jini://linux10 spaceName = frans_space CLIENT PROCESS 2 STARTED Processing local data (0) = 0 (1) = 1 (2) = 2 (3) = 3 (4) = 4 (5) = 5 (6) = 6 (7) = 7 (8) = 8 (9) = 9 Send data CLIENT PROCESS 2 COMPLETE |
Table 9: Output produced by Client process 2
$ ./javaRun MasterProcessApp 2 jiniURL = jini://linux10 spaceName = frans_space MASTER PROCESS STARTED Send synchronisation Processing local data (0) = 0 (1) = 1 (2) = 2 (3) = 3 (4) = 4 (5) = 5 (6) = 6 (7) = 7 (8) = 8 (9) = 9 Collecting data from clients No more data Collection Complete (0) = 0 (1) = 3 (2) = 6 (3) = 9 (4) = 12 (5) = 15 (6) = 18 (7) = 21 (8) = 24 (9) = 27 Clear up MASTER PROCESS COMPLETE |
Table 10: Output produced by Master process