COMMUNICATION, SYNCHRONISATION AND SERIELIZATION

Frans Coenen

1. LOOSE COUPLING

At its simplest, message passing (communication) comprises one process that is designed to send out a message, and another that is designed to receives it. Message passing using JavaSpaces is described as being loosely coupled in that senders and receivers communicate indirectly through the space. (The opposite of loose coupling is tight coupling.)

Channels (or streams) are "information conduits" that take a series of messages.

2. SYNCHRONISATION

Synchronsisation is, of course, important when dealing with distributed systems that use some limitted but shared number of units of resource. Processes requiring these resources use these units if they are availble, if not such processes should wait till an appropriate unit becomes avaialble.




3. SEMAPHORES

Semaphores are a mechansim for controlling units of resource. A semaphore, at its simplest, is an integer counter that represents the count of available units of resource. It is controlled by two operations:

  1. down: Checks the counter, if zero the process waits until it is non zero, if the counter is non-zero (or becomes non-zero) the process decrements the counter and uses one unit of the resource.
  2. up: Increments the counter.

A disadvantage of semaphores is that some processes, waiting for a resource, may be significantly less efficient at obtaining resources than others. In the worst case a process may entirely fail to obtain any resources --- a problem known as startvation. Two possible solutions to the startvation problem are:




4. BARRIER SYNCHRONISATION

A barrier is a point in a distributed computation which eavh process must reach prior to continuing on to the next phase of the computation. Typically a barrier comprises a shared variable which must equal the total number of processes before the next phase can be commenced. The variable is initialised with the value 0, as each process completes the first phase the varibale is incremebted.




5. SERIELIZATION

When an entry is written into space it is serialized, i.e. it is turned into a series of bytes. Similarly when an entry is read from a space it is deserialized. There is thus a serialization overhead associated with writing, reading and taking entries.




6. SNAPSHOTS

If we are repeatedly reading the same entry then we can use the snapshot method to reduce this overhead. Given the HelloWorld entey shown in Table 1, we can write an application class as shown in Table 2.




7. EXAMPLE

We will illustrate the above by creating an application vomprising a Master process and a number of Client processes. Each process will process some data. When complete the Clients will send their processed data into a space where it can be collated by the Master process.

The Client processes will start processing when they receive a signal to do so from the Master process. This will be encoded as a Message entry of the form described earlier in the HelloWorld and the extended HelloWorld programmes. The Mesage class is presented in Table 1.

import net.jini.core.entry.*;

/* ----------------------------------------- */
/*                                           */
/*               MESSAGE CLASS               */
/*                                           */
/* ----------------------------------------- */

public class Message implements Entry {

    // Fields

    public String content;

    // Constructors

    public Message() {
        }

    public Message(String content) {
    	this.content = content;
	}
    }

Table 1: Message entry class

The Master process will also create a synchronisation array (one element per Client process) with all elements set to 0. When each Client completes processing and sending its data it will take the synchronsistaion element whose index corresponds with its Client number out of the space, replace the value with 1 signalling completion, and send the element back into the space. When the entire synchronisation array is "peopled" with 1s the Master process will read the data entries from the space and collate with its own entries. The sysnchronisastion array will be encoded as a distributed array structure as described in the distributed array application example. The Element class used is pesented in table 2.

import net.jini.core.entry.*;

/* ------------------------------------------------ */
/*                                                  */
/*                ARRAY ELEMENT CLASS               */
/*                                                  */
/* ------------------------------------------------ */

public class Element implements Entry {

    /* --- FIELDS --- */

    public String  name;
    public Integer index;
    public Integer value;
    
    /* --- CONSTRUCTORS --- */

    public Element() {
        }

    public Element(String name) {
    	this.name  = name;
	}	
    
    public Element(String name, int index) {
    	this.name  = name;
	this.index = new Integer(index);
	}
	
    public Element(String name, int index, int value) {
    	this.name  = name;
	this.index = new Integer(index);
	this.value = new Integer(value);
	}
    }

Table 2: Element entry class

The Master and Client processes are described by two sub-classes, MasterProcess and ClientProcess, of a common parent class (Process). These three classes are presented in Tables 3, 4 and 5.

/* ------------------------------------------- */
/*                                             */
/*                     PROCESS                 */
/*                                             */
/* ------------------------------------------- */

// JavaSpacesUtil package

import JavaSpacesUtils.SpaceAccessor;

// Jini core packages

import net.jini.core.lease.*;

// Jini extension package

import net.jini.space.JavaSpace;

public class Process {

    /* ------ FIELDS ------ */

    protected JavaSpace space;
    protected int[] dataArray = new int[10];

    /* ------ COMSTRUCTORS ------ */

    public Process() {

        // Get JavaSpace

        try {
            SpaceAccessor newSpaceAccessor = new
	   		SpaceAccessor("/home/staff5/ra/frans/JavaProgs/" +
			"JavaSpaces/JavaSpacesUtils/frans_space.prop");
	    space = newSpaceAccessor.getSpace();
	    }
        // Catch block
	catch(Exception e) {
	    e.printStackTrace();
	    }
        }

    /* ------ METHODS ------ */

    /* PROCESS DATA */

    protected void processData() {

        System.out.println("Processing local data");

        // Loop
        for(int index=0;index < dataArray.length;index++)
        			dataArray[index] = index;
        }
    		
    /* OUTPUT DATA */

    protected void outputData() {
        // Loop
        for(int index=0;index < dataArray.length;index++) {
            System.out.println("(" + index + ") = " + dataArray[index]);
            }
        }
    }

Table 3: Process class

/* ------------------------------------------- */
/*                                             */
/*                 MASTER PROCESS              */
/*                                             */
/* ------------------------------------------- */

// Java packages

import java.util.*;
import java.rmi.*;

// JavaSpacesUtil package

import JavaSpacesUtils.SpaceAccessor;

// Jini core packages

import net.jini.core.lease.*;
import net.jini.core.transaction.*;
import net.jini.core.entry.*;
 
// Jini extension package

import net.jini.space.JavaSpace;

public class MasterProcess extends Process {

    /* ------ FIELDS ------ */

    private int numClients;

    /* ------ COMSTRUCTORS ------ */

    /* Also calls parent constructor */

    public MasterProcess(int clients) {
        numClients = clients;
        }

    /* ------ METHODS ------ */
        
    /* START MASTER PROCESS */
    
    public void startMasterProcess() {
        System.out.println("MASTER PROCESS STARTED");
		
        // try block
        try {
            // Tell client processes to start
            sendStartMessage();
            // Send syncro array
            sendSynchroArray();
            // Process data
            processData();
            outputData();
            // Collect data fromClients
            collectData();
            // Output data
            outputData();
            // End
	    System.out.println("Clear up");
	    clearUp();
            System.out.println("MASTER PROCESS COMPLETE");
            System.exit(0);
            }
	// Catch block
	catch(Exception e) {
	    e.printStackTrace();
	    }
	}

    /* SEND START MESSAGE */

    private void sendStartMessage() throws TransactionException,
    			RemoteException {
         // Create Message object entry
	 Message msg = new Message("Start");
	 // Send message to space
	 space.write(msg,null,Lease.FOREVER);
	 }

    /* SEND SYNCHRO ARRAY */
    	
    private void sendSynchroArray() throws TransactionException,
    			RemoteException {
	System.out.println("Send synchronisation");
	
	// Create synchro elements
	
	for (int index=0;index < numClients;index++) {
	    Element newElement = new Element("Synchro",index,0);
	    space.write(newElement,null,Lease.FOREVER);
	    }
        }
	
    /* COLLECT DATA: Collect data from clients */

    private void collectData() throws UnusableEntryException,RemoteException,
    			TransactionException,InterruptedException {
        int index;
        int value;	

        System.out.println("Collecting data from clients");	
	
	// Create a template
	Element template = new Element("data");

	// Loop
	while(true) {
	    // Read entry
	    Element result = (Element) space.takeIfExists(template,null,
					JavaSpace.NO_WAIT);
     	    if (result == null) {
                if (checkForEnd()) {
		    System.out.println("No more data");
		    break;
		    }
                else Thread.sleep(1000);	// Delay 1 second
                }
            else {
                index = result.index.intValue();
                value = result.value.intValue();			
                dataArray[index] = dataArray[index] + value;
		}
	    }
	System.out.println("Collection Complete");
        }

    /* CHECK FOR END */

    private boolean checkForEnd() throws UnusableEntryException,RemoteException,
    			TransactionException,InterruptedException {
    	for(int index=0;index < numClients;index++) {
	    Element template = new Element("Synchro",index,1);
	    Element result = (Element) space.readIfExists(template,null,
					space.NO_WAIT);
	    if (result == null) return(false);
	    }
	
	// End
	return(true);
	}
    
    /* CLEAR UP: Remove unwanted entries from space */
    
    private void clearUp() throws UnusableEntryException,RemoteException,
    			TransactionException,InterruptedException {
        
	// Remove message entry
	
	Message template1 = new Message("Start");
	space.takeIfExists(template1,null,space.NO_WAIT);
	
	// Remove synchro array
	
	for (int index=0;index < numClients;index++) {
	    Element template2 = new Element("Synchro",index);
	    space.takeIfExists(template2,null,space.NO_WAIT);
	    }
	}
    }

Table 4: MasterProcess class

/* ------------------------------------------- */
/*                                             */
/*                 CLIENT PROCESS              */
/*                                             */
/* ------------------------------------------- */

// Java packages

import java.rmi.*;

// JavaSpacesUtil package

import JavaSpacesUtils.SpaceAccessor;

// Jini core packages

import net.jini.core.lease.*;
import net.jini.core.transaction.*;
import net.jini.core.entry.*;

// Jini extension package

import net.jini.space.JavaSpace;

public class ClientProcess extends Process {

    /* ------ FIELDS ------ */

    private int clientNum;

    /* ------ COMSTRUCTORS ------ */

    /* Also calls parent constructor */

    public ClientProcess(int clientN) {
        clientNum = clientN-1;	// First index is 0 not 1!
        }

    /* ------ METHODS ------ */
        
    /* MAIN */
    
    public void startClientProcess() {
		
        // try block
        try {
            // Wait for Master
	    getStartMessage();
	    System.out.println("CLIENT PROCESS " + (clientNum+1) + " STARTED");
            // Process data
            processData();
            // Output data
            outputData();
            // Send data
            sendData();
            sendEnd();
            // End
            System.out.println("CLIENT PROCESS " + (clientNum+1) + " COMPLETE");
            System.exit(0);
            }
	// Catch block
	catch(Exception e) {
	    e.printStackTrace();
	    }
	}

    /* GET START MESSAGE */

    private void getStartMessage() throws UnusableEntryException,
    			RemoteException,TransactionException,
			InterruptedException {
         // Create Message template
	 Message template = new Message("Start");
	 // Get message
	 Message result = (Message) space.read(template,null,
					Long.MAX_VALUE);
	 }

    /* SEND DATA */

    private void sendData() throws TransactionException,RemoteException {
        Element newElement;
	
	System.out.println("Send data");
	
	// Loop
	
        for(int index=0;index < dataArray.length;index++) {
            // Create element
            newElement = new Element("data",index,dataArray[index]);
            // Write element
            space.write(newElement,null,Lease.FOREVER);
            }
        }

    /* SEND END: Get element of "Synchro" array that represents the current
    process, change its value to 1 and resent into space. */

    private void sendEnd() throws UnusableEntryException,RemoteException,
    			 TransactionException,InterruptedException {
    	// Create template
    	Element template = new Element("Synchro",clientNum);
    	// Take element
    	Element result = (Element) space.take(template,null,Long.MAX_VALUE);
    	// Revise value to 1
    	result.value = new Integer(1);
    	// Send back
    	space.write(result,null,Lease.FOREVER);
    	}
    }

Table 5: ClientProcess class

We will also need Master and Client application classes, these are presented in Tables 6 and 7.

/* ------------------------------------------- */
/*                                             */
/*           MASTER PROCESS APPLICATION        */
/*                                             */
/* ------------------------------------------- */

public class MasterProcessApp {

    /* ------ METHODS ------ */

    public static void main(String[] args) {
        int numClients = Integer.parseInt(args[0]);

        // Create instance of MasterProcess

        MasterProcess newMasterProcess = new MasterProcess(numClients);

        // Commence master process

        newMasterProcess.startMasterProcess();
        }
    }

Table 6: MasterProcessApp class

/* ------------------------------------------- */
/*                                             */
/*           CLIENT PROCESS APPLICATION        */
/*                                             */
/* ------------------------------------------- */

public class ClientProcessApp {

    /* ------ METHODS ------ */

    public static void main(String[] args) {
        int clientNum = Integer.parseInt(args[0]);

        // Create instance of ClientProcess

        ClientProcess newClientProcess = new ClientProcess(clientNum);

        // Commence client process

        newClientProcess.startClientProcess();
        }
    }
	

Table 7: ClientProcessApp class

To run the above we should start the client processes before the Master process. Tables 8, 9 and 10 show the output from running two Client process and a Master process.

$ ./javaRun ClientProcessApp 1
jiniURL   = jini://linux10
spaceName = frans_space
CLIENT PROCESS 1 STARTED
Processing local data
(0) = 0
(1) = 1
(2) = 2
(3) = 3
(4) = 4
(5) = 5
(6) = 6
(7) = 7
(8) = 8
(9) = 9
Send data
CLIENT PROCESS 1 COMPLETE

Table 8: Output produced by Client process 1

$ ./javaRun ClientProcessApp 2
jiniURL   = jini://linux10
spaceName = frans_space
CLIENT PROCESS 2 STARTED
Processing local data
(0) = 0
(1) = 1
(2) = 2
(3) = 3
(4) = 4
(5) = 5
(6) = 6
(7) = 7
(8) = 8
(9) = 9
Send data
CLIENT PROCESS 2 COMPLETE

Table 9: Output produced by Client process 2

$ ./javaRun MasterProcessApp 2
jiniURL   = jini://linux10
spaceName = frans_space
MASTER PROCESS STARTED
Send synchronisation
Processing local data
(0) = 0
(1) = 1
(2) = 2
(3) = 3
(4) = 4
(5) = 5
(6) = 6
(7) = 7
(8) = 8
(9) = 9
Collecting data from clients
No more data
Collection Complete
(0) = 0
(1) = 3
(2) = 6
(3) = 9
(4) = 12
(5) = 15
(6) = 18
(7) = 21
(8) = 24
(9) = 27
Clear up
MASTER PROCESS COMPLETE

Table 10: Output produced by Master process