JXTA Prime Cruncher


After this simple application completes, you are ready for a more advanced tour of the JXTA API. In this example, we will design and write a distributed JXTA application that solves parallel computing problems. We will construct this application in an iterative fashion, expanding its capabilities and the set of APIs it uses with each step. The source code in this book can only cover the most important parts of the application; please check the book's Web site for the full source code.

A large subset of computational problems lend themselves to a parallel solution. Parallel execution of a task means that you break a problem into many smaller sub-problems, and cause those sub-problems to execute simultaneously. After a subtask completes, it returns its result to a master process, which then assembles the answer to the larger problem from those small results.

As an example, consider the task of creating a list of prime numbers between any two integers. Prime numbers are natural numbers that divide only by themselves and one. Natural numbers that divide by one and any other integer less than themselves are composite numbers. Thus, the simplest way to produce a list of prime numbers is to eliminate from a list of natural numbers all composites; the elements remaining in the list will all be primes.

That method is the essence of a very old albeit not very efficient algorithm: the Sieve of Eratosthenes. It is named after Eratosthenes of Cyrene (ca. 275 195 B.C.), a mathematician chiefly known for being the first to accurately estimate the diameter of the Earth; he also served as director of the famous Alexandria library.

The Sieve of Eratosthenes identifies prime numbers by iterating through a list of natural numbers and attempting to eliminate from that list all composites. It does that by dividing every number in the list by each natural number between two and the square root of the ultimate number in the list. If any number in the list divides by a number other than itself without leaving a remainder, then that number is a composite and is marked as such. After the iterations complete, eliminating all marked numbers leaves only primes in the list. The following example illustrates how the Sieve works.

Consider the list of natural numbers between 10 and 20. To find all primes between these two numbers, we will divide every element in the list by 2, 3, and 4 (4.47 being the approximate square root of 20). The original list is as follows: 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, and 20. Eliminating the numbers that divide by 2 without a remainder leaves 11, 13, 15, 17, and 19. Next, removing all numbers that divide by 3 leaves 11, 13, 17, and 19. Doing the divisions by 4 does not change the list. Therefore, we have obtained the complete list of primes between 10 and 20.

With a very long list of natural numbers for instance, with numbers several million digits long we can divide that list into multiple smaller lists, and perform the Sieve on each list simultaneously. Each of those computations might be handed out to different machines on the network, taking advantage of distributed computing resources. Prime number searching is but one of a large set of problems that can be parallelized. Among the popular uses of P2P-style software are applications such as the SETI@HOME project, which aims to decode signals from outer space in search of intelligent life on other planets, or similar projects enabling users to contribute their idle CPU resources to tasks such as simulating protein folding or decoding strands of DNA.

In this application, a master process will request two numbers from the user and produce a list of all primes between those two numbers. The master process will attempt to discover other peers on the JXTA network offering the prime number search service, and try to parcel out list segments to them for processing. After a peer completes its part of the work, it will send back an array of primes for its segment of the list. For that distribution to work, we will enable a JXTA peer to advertise its prime searching capability on the network so that others can find and connect to it. Figure 16.17 outlines this generic JXTA prime cruncher.

Figure 16.17. A server architecture suitable for finding large prime numbers in a distributed manner.

graphics/16fig17.gif

JXTA Application Design

Perhaps the most unusual aspect of this application is that every peer acts both as a master process and a slave, helping compute a sublist handed to it by a master. It is also conceivable that a slave might decide to further break down the problem into small subtasks, and act as a master process itself. This server-mode/client-mode operation is an essential P2P application design pattern. We will refer to that pattern as a SM/CM operation. It's worth noting that we will exploit SM/CM to reuse code: The master process itself will act as a slave to an adapter standing between it and the user interface: When a user specifies the two extremes of the natural number list, that adapter constructs the list and passes it to the prime cruncher component. Figure 16.18 illustrates this design.

Figure 16.18. A peer offers both server-mode and client-mode operations (SM/CM).

graphics/16fig18.gif

Message Definition

When designing a JXTA application, we must bear in mind that JXTA is a message-based system: The primary contract between peers is defined by a set of messages. Thus, the first design task is to define that message exchange. In the prime cruncher application, a peer passes a message to another peer containing the two boundaries of the list. The receiving peer then computes a list of all primes between those two extremes, and returns that sublist to the original peer. The net.jxta.endpoint.Message class abstracts out the concept of a message. It allows one to associate an arbitrary set of message elements with a key. We will use instances of that class with the following key-value structures seen in Tables 16.1 and 16.2.

Table 16.1. Request Message
Key Value
ServiceConstants.LOW_INT Lower boundary of the (sub)list
ServiceConstants.HIGH_INT Upper boundary of the (sub)list

Table 16.2. Response Message
Key Value
ServiceConstants.LOW_INT Lower boundary of the (sub)list
ServiceConstants.HIGH_INT Upper boundary of the (sub)list
ServiceConstants.PRIMELIST A string containing all primes between the bounds of the list. The primes are separated by ; characters.

Service Definition and Discovery

Next we must define a way for a master to find slaves on the network. In other words, we must specify the prior knowledge a peer must have in order to discover other peers offering the prime crunching service.

As mentioned earlier, a JXTA service is defined by its module class and specification. Thus, we will define advertisements for the number-crunching module's class and specification, and cause a peer offering that service to propagate those advertisements on the JXTA network. The prime-crunching module class will assume the name JXTACLASS:com.sams.p2p.primecruncher, and the module's spec will have the name JXTASPEC:com.sams.p2p.primecruncher.

Masters will discover peers that advertise module specifications with that name. Thus, in addition to the message definition, the service name string is another piece of information peers must posses at design time. All other information pertinent to peer interaction will be discovered at runtime.

Service Implementation

When a prime-crunching peer starts up, it must first initialize the JXTA platform to gain access to the World and Net Peer Groups. The code for that initialization is similar to our earlier example. After the platform initiated, the peer creates and publishes its advertisements, including its module class and module spec advertisements.

The module spec advertisement will include the advertisement of a pipe. Clients discovering a module spec advertisement for the service must obtain the pipe advertisement, and connect to the service via that pipe.

After it has published its advertisements, our service opens an input pipe and listens for incoming messages. When a message arrives, the service attempts to obtain the high and low boundary numbers from it, and pass those onto a component responsible for generating the primes-only sublist. When that component returns its results (an array containing the primes), the prime cruncher service attempts to create a message with a result and then send that message back to the client. In the first iteration, the service will simply print out the message it receives. In subsequent refinements, it will open a pipe back to the client and send the results back to it. The client will then assemble the results from all the peers it heard back from and save the resulting master list into the file.

The outline of this server component is shown in Listing 16.2.

Listing 16.2 Outline of PrimePeer and Initialization of a JXTA Peer
 package primecruncher; import net.jxta.peergroup.PeerGroup; import net.jxta.peergroup.PeerGroupFactory; import net.jxta.peergroup.PeerGroupID; import net.jxta.discovery.DiscoveryService; import net.jxta.pipe.PipeService; import net.jxta.pipe.InputPipe; import net.jxta.pipe.PipeID; import net.jxta.exception.PeerGroupException; import net.jxta.protocol.ModuleClassAdvertisement; import net.jxta.protocol.ModuleSpecAdvertisement; import net.jxta.protocol.PipeAdvertisement; import net.jxta.document.*; import net.jxta.platform.ModuleClassID; import net.jxta.platform.ModuleSpecID; import net.jxta.id.IDFactory; import net.jxta.endpoint.Message; import java.io.FileInputStream; import java.io.IOException; import java.io.FileOutputStream; import java.io.StringWriter; public class PrimePeer {         private static PeerGroup group;         private static DiscoveryService discoSvc;         private static PipeService pipeSvc;         private InputPipe inputPipe;         private static final String PIPE_ADV_FILE = "primeserver_pipe.adv";         public static void main(String[] argv) {                PrimePeer pp = new PrimePeer();                pp.startJxta();                pp.doAdvertise();                pp.startService();         }         public PrimePeer() {         }         private void startJxta() {                try {                        group = PeerGroupFactory.newNetPeerGroup();                        discoSvc = group.getDiscoveryService();                        pipeSvc = group.getPipeService();                } catch (PeerGroupException e) {                        System.out.println("Cannot create Net Peer Group: " + e.getMessage( graphics/ccc.gif));                        System.exit(-1);                }         }       /**        * Create and propagate advertisements        */         private void doAdvertise() {            ...         }         /*          * Start up the service, listen for incoming messages on the service's input pipe.          */         private void startService() {            ...         }     /**          * Compute the requested list of prime numbers.           */         private void processInput(String high, String low) {            ...         } } 

In the startJxta() service initialization method, we first obtain a reference to the World Peer Group; this is done via a static PeerGroupFactory method. Calling that method will cause the JXTA runtime to bootstrap. Next, we obtain references to two peer group services that the Net Peer Group provides: the DiscoveryService and the PipeService. We will use both when creating the service's advertisements.

Creating and Publishing Advertisements

As we mentioned earlier, the JXTA virtual network relies on JXTA IDs to identify network resources. The discovery of those resources occurs via advertisements. The net.jxta.id package contains the ID class, as well as a factory for creating various kinds of IDs IDFactory. Listing 16.3 uses IDFactory to create a ModuleClassID for our new module.

In JXTA, a net.jxta.document.Document serves as a general container for data. A Document in JXTA is defined by the MIME media type of its content, and it has the capability of producing an InputStream with the content itself. In that sense, Document is somewhat analogous to an HTTP stream. JXTA makes no attempt to interpret the content of a Document; that content is part of an application-level protocol.

A Document that holds the advertisement of a JXTA network resource is a net.jxta.document.Advertisement. An Advertisement is a StructuredDocument, composed of a hierarchy of elements similar to XML. Structured documents can be nested, which enables a document to be manipulated without regard to the physical representation of its data.

As with any StructuredDocument, an Advertisement can be represented in XML or plain text formats. An Advertisement contains the ID of the resource it advertises, the type of the Advertisement, as well as an expiration time specified as an absolute time value. The JXTA API provides a convenient factory, AdvertisementFactory, to create different types of advertisements. Listing 16.3 shows the creation of a new ModuleClassAdvertisement via that factory class. Note the manner in which the ModuleClassID is added to the advertisement.

Listing 16.3 Creating and Advertising a Module Class
 private void doAdvertise() {         ModuleClassAdvertisement classAd =                (ModuleClassAdvertisement)AdvertisementFactory.newAdvertisement(                        ModuleClassAdvertisement.getAdvertisementType());         ModuleClassID classID = IDFactory.newModuleClassID();         classAd.setModuleClassID(classID);         classAd.setName(ServiceConstants.CLASS_NAME);         classAd.setDescription("A prime number crunching service.");         try {                discoSvc.publish(classAd, DiscoveryService.ADV);                discoSvc.remotePublish(classAd, DiscoveryService.ADV);                System.out.println("Published module class adv.");         } catch (IOException e) {                System.out.println("Trouble publishing module class adv: " +                       e.getMessage());         } 

The JXTA net.jxta.discovery.DiscoveryService is a group service provided by the Net Peer Group, and its main purpose is to facilitate the publishing and discovery of advertisements. It provides two modes of both publishing and discovery: local and remote. The local mode has to do with the peer's local cache local discovery means looking for advertisements in that cache, and local publishing means entering an advertisement into the local cache. Remote, as its name says, means performing discovery and publishing in the context of the entire peer group. Thus, query messages propagate throughout the JXTA virtual network in accord with the protocols we described previously, and responses are resolved to those queries as they arrive from the network. Thus, remote discovery is asynchronous it might take quite a while for a desired advertisement type to be found on the JXTA network. Listing 16.3 shows both the remote and local publishing of the ModuleClassAdvertisement.

Similar to the preceding process, we create a ModuleSpec ID via the IDFactory class, and its corresponding advertising is obtained from the AdvertisementFactory (see Listing 16.4).

Listing 16.4 Creating a New ModuleSpecAdvertisement
         ModuleSpecAdvertisement specAd =                (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(                        ModuleSpecAdvertisement.getAdvertisementType());         ModuleSpecID specID = IDFactory.newModuleSpecID(classID);         specAd.setModuleSpecID(specID);         specAd.setName(ServiceConstants.SPEC_NAME);         specAd.setDescription("Specification for a prime number crunching service");         specAd.setCreator("Sams Publishing");         specAd.setSpecURI("http://www.samspulishing.com/p2p/primecruncher"); specAd.setVersion("Version 1.0"); 

Recall that a ModuleSpecAdvertisement defines a wire protocol, or a network behavior, to access a service. Thus, we need to provide a PipeAdvertisement as a parameter to the ModuleSpecAdvertisement. Because the module's advertisements will be cached by peers on the network, it is important to ensure that each ModuleSpecAdvertisement refers to the same pipe. Thus, we must save the pipe's advertisement to persistent storage and read that data from storage whenever creating a new pipe advertisement, as shown in Listing 16.5. (If the advertisement has not been saved to disk yet, create and save a new one.)

Listing 16.5 Creating a Pipe Advertisement
         PipeAdvertisement pipeAd = null;         try {                FileInputStream is = new FileInputStream(PIPE_ADV_FILE);                pipeAd = (PipeAdvertisement)AdvertisementFactory. newAdvertisement(                     new MimeMediaType("text/xml"), is);                is.close();         } catch (IOException e) {                pipeAd = (PipeAdvertisement)AdvertisementFactory. newAdvertisement(                     PipeAdvertisement.getAdvertisementType());                PipeID pid = IDFactory.newPipeID(group.getPeerGroupID());                pipeAd.setPipeID(pid);                //save pipeAd in file                Document pipeAdDoc = pipeAd.getDocument(new MimeMediaType ("text/xml"));                try {                      FileOutputStream os = new FileOutputStream(PIPE_ADV_FILE);                      pipeAdDoc.sendToStream(os);                      os.flush();                      os.close();                      System.out.println("Wrote pipe advertisement to disk.");                } catch (IOException ex) {                      System.out.println("Can't save pipe advertisement to file " +                             PIPE_ADV_FILE);                      System.exit(-1);                }          } 

The following code segment saves a pipe advertisement to disk in XML format. For instance, one running of this code produced the following XML document:

 <?xml version="1.0"?>  <!DOCTYPE jxta:PipeAdvertisement> <jxta:PipeAdvertisement xmlns:jxta="http://jxta.org">         <Id> urn:jxta:uuid-59616261646162614E5047205032503382CCB236202640F5A242ACE15A8F9D7C04         </Id>         <Type>               JxtaUnicast         </Type> </jxta:PipeAdvertisement> 

We subsequently pass this new PipeAdvertisement as a parameter to the ModuleSpecAdvertisement, as shown in Listing 16.6.

Listing 16.6 Adding the PipeAdvertisement as a Parameter to the ModuleSpecAdvertisement
 specAd.setPipeAdvertisement(pipeAdv); 

At this point, we are ready to publish the ModuleSpecAdvertisement both locally and remotely, as illustrated in Listing 16.7.

Listing 16.7 Local and Remote Publishing of a ModuleSpecAdvertisement
         try {                discoSvc.publish(specAd, DiscoveryService.ADV);                discoSvc.remotePublish(specAd, DiscoveryService.ADV);                System.out.println("Published module spec adv");         } catch (IOException e) {                System.out.println("Trouble publishing module spec adv: " +                        e.getMessage()); } 

Finally, we create an InputPipe based on the pipe advertisement in Listing 16.8.

Listing 16.8 InputPipe Creation from a PipeAdvertisement
         //create an input pipe based on the advertisement         try {                inputPipe = pipeSvc.createInputPipe(pipeAd);                System.out.println("Created input pipe");         } catch (IOException e) {                System.out.println("Can't create input pipe. " + e.getMessage());         } } 

These are all the steps needed to publish a new JXTA service. Recall that a module's class advertisement advertises the fact that the module functionality exists in a peer group; it is a fairly abstract concept, somewhat analogous to a Java interface that defines an API, but does not provide an implementation. A module's spec advertisement, on the other hand, specifies a wire protocol to access the service. In this case, that wire protocol consists of an InputPipe to which other peers can send messages. It is to that InputPipe that the messages specifying the two boundary numbers will arrive.

Processing Messages from an InputPipe

The next step in implementing the prime cruncher peer is to process the received messages. We will break this task down into handling incoming messages, calculating the desired list of prime numbers, and sending back a response. Listing 16.9 shows the first part of the activity.

Listing 16.9 Processing Messages on an InputPipe
 private void startService() {         while (true) {                Message msg = null;                try {                        msg = inputPipe.waitForMessage();                } catch (InterruptedException ex) {                        inputPipe.close();                        return;                }                String highInt = msg.getString(ServiceConstants.HIGH_INT);                String lowInt = msg.getString(ServiceConstants.LOW_INT);                if (highInt != null || lowInt != null) {                       processInput(highInt, lowInt);                }         } } 

As mentioned before, the net.jxta.endpoint.Message object is sent between two peers by EndpointService (an implementation of the Endpoint Protocol discussed earlier). A Message consists of a set of MessageElements, and features a destination EndpointAddress to facilitate its routing through the JXTA network. A message element can be any array of bytes, and Message has the capability to retrieve an element as a String. When a new message element is specified, it can be associated with a MIME type, as well as a String that serves as the element's key. In this method implementation, we retrieve the message elements referenced by the keys ServiceConstants.HIGH_INT and ServiceConstants.LOW_INT. If both elements are valid Strings, we pass them onto a private method, processInput().

processInput() is responsible for executing the Sieve of Eratosthenes algorithm (or any other algorithm) to produce a list of all prime numbers between LOW_INT and HIGH_INT. To save space, we will not show that part of the code here; instead, the full source code is available for download from samspublishing.com. In addition, the full source code also contains a version of startService() that retrieves a PipeAdvertisement from the Message (as another message element), opens a pipe back to the client, and sends the list of prime numbers back to the client.

The Prime Cruncher Client

The purpose of the client in this application is to distribute the computation load to as many peers advertising the number-crunching service as possible. Consider a user wanting to obtain all prime numbers between 1 and 10,000. When a peer receives that user request, it needs to determine how many other peers it can share the task with. Thus, it must continuously discover peers advertising the prime number service and maintain a cache of those peers' advertisements. If a peer has, for example, 10 other peers it can share the work with, then it might then create a message with LOW_INT set to 1 and HIGH_INT set to 1000, then another message with the numbers set to 1001 and 2000, respectively, and so forth. Finally, the client would open a pipe to each of the 10 peers, and transmit one message to each. Figure 16.19 describes that peer-to-peer message exchange.

Figure 16.19. The peer-to-peer message exchange.

graphics/16fig19.gif

The client's skeleton looks similar to the server's (see Listing 16.10). It also initializes the Net Peer Group, and obtains from it the group's discovery and pipe services.

Listing 16.10 PrimeClient
 package primecruncher; import net.jxta.peergroup.PeerGroup; import net.jxta.peergroup.PeerGroupFactory; import net.jxta.discovery.DiscoveryService; import net.jxta.discovery.DiscoveryListener; import net.jxta.discovery.DiscoveryEvent; import net.jxta.pipe.PipeService; import net.jxta.pipe.OutputPipe; import net.jxta.pipe.PipeID; import net.jxta.exception.PeerGroupException; import net.jxta.protocol.DiscoveryResponseMsg; import net.jxta.protocol.ModuleSpecAdvertisement; import net.jxta.protocol.PipeAdvertisement; import net.jxta.document.StructuredTextDocument; import net.jxta.document.MimeMediaType; import net.jxta.document.TextElement; import net.jxta.document.AdvertisementFactory; import net.jxta.id.IDFactory; import net.jxta.endpoint.Message; import java.util.Enumeration; import java.io.StringWriter; import java.io.IOException; import java.net.URL; import java.net.MalformedURLException; import java.net.UnknownServiceException; import java.util.HashSet; import java.util.Set; public class PrimeClient implements DiscoveryListener {         private static PeerGroup group;         private static DiscoveryService discoSvc;         private static PipeService pipeSvc;         private OutputPipe outputPipe;         private Set adverts = new HashSet();         public PrimeClient() {         }         public static void main(String[] argv) {                Client cl = new Client();                cl.startJxta();                cl.doDiscovery();         }         public int[] processPrimes(int low, int high) {         }         private void startJxta() {                try {                        group = PeerGroupFactory.newNetPeerGroup();                        discoSvc = group.getDiscoveryService();                        pipeSvc = group.getPipeService();                } catch (PeerGroupException e) {                        System.out.println("Can't create net peer group: " +                                e.getMessage());                        System.exit(-1);                }         }         private void doDiscovery() {         } } 

Although PrimePeer's key responsibility is to advertise its service and process incoming messages, PrimeClient must participate in the service discovery process. The doDiscovery() method initiates service discovery. First, the peer looks into its local cache for advertisements that match a Name attribute in the prime computing module's specification. It then processes each advertisement it finds there (see Listing 16.11).

Listing 16.11 Performing Local Discovery
         System.out.println("Starting service discovery...");         System.out.println("Searching local cache for " +                ServiceConstants.SPEC_NAME + " advertisements");         Enumeration res = null;         try {                 res = discoSvc.getLocalAdvertisements(DiscoveryService.ADV,                       "Name", ServiceConstants.SPEC_NAME);         } catch (IOException e) {                System.out.println("IO Exception.");         }         if (res != null) {                while (res.hasMoreElements()) {                        processAdv((ModuleSpecAdvertisement)res.nextElement());                } } 

Next, the peer initiates remote advertisement discovery. Remote discovery means that discovery queries propagate through the JXTA network, and responses arrive as suitable advertisements are found. Thus, remote discovery is an asynchronous process. We pass a DiscoveryListener as an argument to DiscoveryService's getRemoteAdvertisements() method. In addition, we must also specify a threshold of the number of advertisements we desire to receive from each peer (see Listing 16.12).

Once remote discovery is initiated, discovered advertisements are cached in the local advertisement cache. So, the next time the peer starts up, it will likely discover advertisements from that cache.

Listing 16.12 Initiating Remote Service Discovery
         System.out.println("Starting remote discovery...");         discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV,                 "Name", ServiceConstants.SPEC_NAME, 1, this); } 

DiscoveryListener specifies the discoveryEvent() method that gets called each time an advertisement matching our criteria is found. A DiscoveryEvent contains a DiscoveryReponseMsg, containing the actual advertisements found through remote discovery. We obtain an enumeration of those advertisements and process each, as seen in Listing 16.13.

Listing 16.13 Implementing a DiscoveryListener
         public void discoveryEvent(DiscoveryEvent event) {                System.out.println("DiscoveryEvent called");                DiscoveryResponseMsg  mes = event.getResponse();                //these contain the responses found                Enumeration res = mes.getResponses();                if (res != null) {                       while (res.hasMoreElements()) {                               processAdv((ModuleSpecAdvertisement) res.nextElement());                       }                } } 

Our processAdv() method is very simple: It inserts each ModuleSpecAdvertisement into a set. A set ensures that no duplicate advertisements are stored. This set acts as a cache for module spec advertisements:

         private void processAdv(ModuleSpecAdvertisement ad) {                 adverts.add(ad); } 
Advertisement Processing

After we've set up a discovery listener, it will keep adding newly discovered module spec advertisements to our simple local cache. Each time the processPrimes() method gets called, the client peer will attempt to contact the peers represented by these module spec advertisements, connect to their input pipes, and pass a message that initiates the prime number search on each of those peers.

The first item in this method is to determine the set of peers we can delegate work to. Recall that an advertisement has an expiration date associated with it. Thus, we must eliminate advertisements that are no longer valid:

 Public int[] processPrimes(int low, int high) {          Set setCopy = null;         synchronized(adverts) {                Set setCopy = (Set) adverts.clone();         }         ArrayList workingList = new ArrayList();         ArrayList expired = new ArrayList();         long currentTime = System.getCurrentTimeMillis();         Iterator it = workingSet.iterator();         while (it.hasNext()) {                ModuleSpecAdvertisement ad = (ModuleSpecAdvertisement)it.next();                if (ad.getLocalExpirationTime() > currentTime + (2 * 60 *1000)) {                       workingList.addElement(ad);                } else {                        expired.addElement(ad);                }         } removeExpired(expired); 

The preceding code segment performs a simple cache management of discovered advertisements, delegating the removal of all advertisements that have either expired or about to expire shortly to the removeExpired() method (not shown here, but is included in the full source code).

After we have a set of valid advertisements, we can start processing them in order to obtain from them the pipe advertisement that we must use to send the messages. Because we assume (at least in this example) that all those advertisements refer to peers that we will actually use in our prime searching task, we first break down the job into smaller tasks corresponding to each peer.

Note that this job distribution is rather contrived: Some peers might be more capable than others, and some might have better network connections than others. Those differences should be taken into account when assigning tasks to a peer. Also, in practice it might not make sense to divide the job into too many small segments, because the network communication time could easily dominate the time spent on the actual processing of the prime numbers list. However, this example aims to illustrate how to obtain a pipe advertisement from a ModuleSpecAdvertisement, how to create a new message, and then how to send that message down the pipe.

Listing 16.14 shows how the natural number list is broken into sublists, each sublist corresponding to a message that will be sent to a peer participating in the computation. Messages are then inserted into a hash map, and the key of the map indicates a message's status: Was it sent out already? Have we received a result for that computation yet?

Listing 16.14 Creating New Messages
 Map messageMap = new HashMap(); int size = workingList.size() int mod = high % size; high -= mod; int perPiece = high / size; for (int i=0; i < size; i++) {         //create a new message         Message msg = pipeSvc.createMessage();         msg.setString(ServiceConstants.LOW_INT, low);         //last message will get to compute a bit more         if (i == size-1) {                high = low + perPiece   1 + mod;         } else {                high = low + perPiece -1;         }         msg.setString(ServiceConstants.HIGH_INT, high);         low += perPiece;         //we neither sent the message, nor did we get a response         StatusMap statusMap = new StatusMap(false, false);         StatusMap statusMap = new StatusMap(false, false);         messageMap.put(statusMap, msg); } 

StatusMap is simply a pairing of two Boolean values; it is not listed here.

Our final step is to extract the pipe advertisements from each ModuleSpecAdvertisement, open each pipe, and send a message to that pipe. Finally, we will mark the message as sent.

Recall that an advertisement is just a structured document, similar to an XML document. It can easily be converted to a text document and printed out. It's useful to inspect the contents of the advertisement during development and at debug-time (see Listing 16.15).

Listing 16.15 Printing an Advertisement
 Collection ads = messageMap.values(); Iterator it = ads.iterator(); while (it.hasNext()) {         ModuleSpecAdvertisement ad = (ModuleSpecAdvertisement)it.next();         //First, print out ModuleSpec advertisement on standard output         StructuredTextDocument doc =               (StructuredTextDocument)ad.getDocument(new MimeMediaType ("text/plain"));         try {                StringWriter out = new StringWriter();                doc.sendToWriter(out);                System.out.println(out);                out.close();         } catch (IOException e) {         } ... 

As we discussed earlier, a StructuredTextDocument consists of elements, and one such element is a parameter. When we constructed the ModuleSpecAdvertisement for our service, we entered the service's pipe advertisement as a parameter. The parameter is just another StructuredDocument element that we can manipulate in the same way we would an XML document.

In parsing the advertisement's parameter element, we first obtain the pipe's ID and type. The pipe's ID conforms to a URN specification, outlined in the JXTA specifications, which encodes the 128-bit special identifier for the pipe. The following is an example of such a URN:

 urn:jxta:uuid-59616261646162614E5047205032503382CCB236202640F5A242ACE15A8F9D7C04  

The IDFactory class is capable of constructing the PipeID object from such a URN. That is the mechanism we use to assign the pipe ID to the pipe advertisement (see Listing 16.16).

Listing 16.16 Working with Advertisement Parameters
         StructuredTextDocument param = (StructuredTextDocument)ad.getParam();         String pipeID = null;         String pipeType = null;         Enumeration en = null;          if (param != null) {                en = param.getChildren("jxta:PipeAdvertisement");         }         Enumeration child = null;          if (en != null) {                 child = ((TextElement)en.nextElement()).getChildren();         }          if (child != null) {                 while (child.hasMoreElements()) {                        TextElement el = (TextElement)child.nextElement();                        String elementName = el.getName();                        if (elementName.equals("Id")) {                               pipeID = el.getTextValue();                        }                        if (elementName.equals("Type")) {                               pipeType = el.getTextValue();                        }               }         }         if (pipeID != null || pipeType != null) {                PipeAdvertisement pipeAdvert = (PipeAdvertisement)                        AdvertisementFactory.newAdvertisement(                               PipeAdvertisement.getAdvertisementType());                try {                        URL pidURL = new URL(pipeID);                        PipeID pid = (PipeID)IDFactory.fromURL(pidURL);                        pipeAdvert.setPipeID(pid);                } catch (MalformedURLException e) {                        System.out.println("Wrong URL: " + e.getMessage());                        return;                } catch (UnknownServiceException e) {                        System.out.println("Unknown Service: " + e.getMessage());                        return;                } } 

Based on this PipeAdvertisement, we are now able to construct an output pipe that connects to the remote peer's input pipe, as shown in Listing 16.17. Recall that a pipe is unidirectional communication channel. Thus, we do not expect to hear back from the remote peer via this pipe. The remote peer performs an essentially similar task, opening a pipe back to the client and sending it a message with the results of the computation. (We do not show that part of the code here; please see the Web site for the full sample source code.)

Listing 16.17 Creating an Output Pipe
         try {                outputPipe = pipeSvc.createOutputPipe(pipeAdvert, 30000);                outputPipe.send(msg);                System.out.println("Sent message on output pipe");         } catch (IOException e) {                System.out.println("Can't send message through pipe: " + e.getMessage());         } } 

An interesting thing about this pipe-creation mechanism is that a peer might have changed network identities between sending out the ModuleSpecAdvertisement and a client contacting it for useful work. However, the peer's virtual identity on the JXTA network remains the same, and the runtime services ensure that the pipe advertised by the ModuleSpecAdvertisement connects.

After you have downloaded the full source code from the Web site and made the decision to run both peers on the same machine, you'll need to start them from separate directories and specify a different network communication port for each in the JXTA Configurator.

To start the server application, ensure that all the JXTA classes are in your classpath (this process is detailed in the JXTA installation document), and then type the following command:

 java primecruncher.PrimePeer  

You might also run the command with the following optional parameters. These parameters allow you to bypass the JXTA login screen:

 java  Dnet.jxta.tls.principal=USERNAME      Dnet.jxta.tls.password=PASSWORD primecruncher.PrimePeer 

By substituting your JXTA username and password you can run the client similarly:

 java  Dnet.jxta.tls.princincipal=USERNAME     -Dnet.jxta.tls.password=PASSWORD primecruncher.PrimeClient 

The prime-finder application of this chapter operates as a full-fledged Java application, with its own user interface and main() method. In the next chapter, we will learn how to interactively invoke this application from the JXTA Shell.



JavaT P2P Unleashed
JavaT P2P Unleashed
ISBN: N/A
EAN: N/A
Year: 2002
Pages: 209

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net