Subversion Repository Public Repository

litesoft

Diff Revisions 948 vs 950 for /trunk/Java/core/Server/src/org/litesoft/peertopeer/nonpublic/broadcastdiscovery/BroadcastMessageManager.java

Diff revisions: vs.
  @@ -1,236 +1,237 @@
1 - // This Source Code is in the Public Domain per: http://unlicense.org
2 - package org.litesoft.peertopeer.nonpublic.broadcastdiscovery;
3 -
4 - import org.litesoft.commonfoundation.typeutils.*;
5 - import org.litesoft.logger.*;
6 - import org.litesoft.peertopeer.*;
7 - import org.litesoft.peertopeer.nonpublic.peermanagement.*;
8 - import org.litesoft.util.*;
9 -
10 - import java.io.*;
11 - import java.net.*;
12 -
13 - public class BroadcastMessageManager implements P2PConstants {
14 - public static final Logger LOGGER = LoggerFactory.getLogger( BroadcastMessageManager.class );
15 -
16 - private static final byte BCA_BYTE = (byte) 255;
17 - private static final byte[] BROADCAST_ADDRESS = new byte[]{BCA_BYTE, BCA_BYTE, BCA_BYTE, BCA_BYTE};
18 -
19 - private String mOurName, mAppGroupID, mAppGroupVersion, mMessagePrefixFilter;
20 - private AppGroupVersionCompatibilityChecker mCompatibilityChecker;
21 -
22 - public BroadcastMessageManager( String pOurName, String pAppGroupID, String pAppGroupVersion, AppGroupVersionCompatibilityChecker pCompatibilityChecker ) {
23 - mOurName = assertNotEmptyAndNoCommas( "OurName", pOurName );
24 - mAppGroupID = assertNotEmptyAndNoCommas( "AppGroupID", pAppGroupID );
25 - mAppGroupVersion = assertNotEmptyAndNoCommas( "AppGroupVersion", pAppGroupVersion );
26 - mCompatibilityChecker = pCompatibilityChecker;
27 - mMessagePrefixFilter = mAppGroupID + ",";
28 - }
29 -
30 - public String getOurName() {
31 - return mOurName;
32 - }
33 -
34 - public synchronized void assert_joinGroup_notCalled() {
35 - if ( mOurServer != null ) {
36 - throw new IllegalStateException( "BroadcastMessageManager has already joined (or attempted to join) Group: " + mAppGroupID );
37 - }
38 - }
39 -
40 - public synchronized void joinGroup( int pServerPort, PeerConnectionHandlerFactory pConnectionHandlerFactory )
41 - throws IOException {
42 - P2Putils.validatePort( pServerPort );
43 - Objects.assertNotNull( "PeerConnectionHandlerFactory", pConnectionHandlerFactory );
44 - assert_joinGroup_notCalled();
45 -
46 - mOurServer = new OurServer( getClass().getName(), pConnectionHandlerFactory );
47 -
48 - // get a datagram socket
49 - DatagramSocket socket = new DatagramSocket();
50 -
51 - try {
52 - socket.setBroadcast( true );
53 -
54 - mOurServer.startListening();
55 -
56 - byte[] buf = createAnnouceMessage( pServerPort );
57 - InetAddress address = InetAddress.getByAddress( BROADCAST_ADDRESS );
58 - DatagramPacket packet = new DatagramPacket( buf, buf.length, address, DISCOVERY_PORT );
59 - socket.send( packet );
60 - }
61 - catch ( NoRouteToHostException e ) {
62 - System.err.println( "*** Unable to Broadcast Our Existence - Proceeding Stand Alone ***" );
63 - mOurServer.stopListening();
64 - }
65 - catch ( IOException e ) {
66 - mOurServer.stopListening();
67 - throw e;
68 - }
69 - finally {
70 - socket.close();
71 - }
72 - }
73 -
74 - private void processAnnounceMessage( String pMessage, InetAddress pInetAddress, PeerConnectionHandlerFactory pConnectionHandlerFactory ) {
75 - if ( !pMessage.startsWith( mMessagePrefixFilter ) ) {
76 - return;
77 - }
78 - int cAt1 = pMessage.indexOf( ',' );
79 - int cAt2 = pMessage.indexOf( ',', cAt1 + 1 );
80 - int cAt3 = pMessage.indexOf( ',', cAt2 + 1 );
81 - if ( (cAt2 != -1) && (cAt3 != -1) ) {
82 - String thierAppGroupVersion = pMessage.substring( cAt1 + 1, cAt2 ).trim();
83 - String thierName = pMessage.substring( cAt2 + 1, cAt3 ).trim();
84 - String thierServerPort = pMessage.substring( cAt3 + 1 ).trim();
85 - if ( (thierAppGroupVersion.length() != 0) && (thierName.length() != 0) && (thierServerPort.length() != 0) ) {
86 - int zPort = parseServerPort( thierServerPort );
87 - if ( zPort != 0 ) {
88 - PeerDefinition zNewPeerDef = new PeerDefinition( thierName, pInetAddress, zPort );
89 - if ( !mAppGroupVersion.equals( thierAppGroupVersion ) &&
90 - ((mCompatibilityChecker == null) || !mCompatibilityChecker.areCompatibile( mAppGroupVersion, thierAppGroupVersion )) ) {
91 - pConnectionHandlerFactory
92 - .handleAnnouncedPeerWithIncompatibleAppGroupVersions( zNewPeerDef, mOurName, mAppGroupVersion, thierAppGroupVersion );
93 - } else {
94 - pConnectionHandlerFactory.handleAnnouncedPeer( zNewPeerDef );
95 - }
96 - return;
97 - }
98 - }
99 - }
100 - LOGGER.error.log( "MalformedBroadcastAnnounceMessage from ", pInetAddress, ":", pMessage );
101 - }
102 -
103 - private byte[] createAnnouceMessage( int pServerPort )
104 - throws UnsupportedEncodingException {
105 - return createAnnouceMessage( mAppGroupID + "," + mAppGroupVersion + "," + mOurName + "," + pServerPort );
106 - }
107 -
108 - private static byte[] createAnnouceMessage( String pMessage ) {
109 - byte[] zSBytes;
110 - try {
111 - zSBytes = pMessage.getBytes( FileUtils.UTF_8 );
112 - }
113 - catch ( UnsupportedEncodingException e ) {
114 - LOGGER.error.log( e ); // shouldn't happen
115 - zSBytes = new byte[0];
116 - }
117 - int rvLength = zSBytes.length + BROADCAST_PREFIX.length;
118 - if ( rvLength > 255 ) {
119 - throw new IllegalArgumentException( "Announce Message too long: " + pMessage );
120 - }
121 - byte[] rv = new byte[rvLength];
122 - int to = 0;
123 - for ( byte b : BROADCAST_PREFIX ) {
124 - rv[to++] = b;
125 - }
126 - for ( byte b : zSBytes ) {
127 - rv[to++] = b;
128 - }
129 - return rv;
130 - }
131 -
132 - private static String parseAnnouceMessage( DatagramPacket packet ) {
133 - int zDataLength = packet.getLength();
134 - if ( zDataLength < MINIMUM_BROADCAST_MESSAGE_LENGTH ) {
135 - return null;
136 - }
137 - byte[] zData = packet.getData();
138 - int zEncodedStringStartsAt = BROADCAST_PREFIX.length;
139 - for ( int i = 0; i < zEncodedStringStartsAt; i++ ) {
140 - if ( zData[i] != BROADCAST_PREFIX[i] ) {
141 - return null;
142 - }
143 - }
144 - String zMessage = null;
145 - try {
146 - zMessage = new String( zData, zEncodedStringStartsAt, zDataLength - zEncodedStringStartsAt, FileUtils.UTF_8 );
147 - }
148 - catch ( UnsupportedEncodingException e ) {
149 - LOGGER.error.log( e ); // shouldn't happen
150 - }
151 - catch ( RuntimeException e ) {
152 - // Not Properly Encoded?
153 - LOGGER.error.log( e );
154 - }
155 - return zMessage;
156 - }
157 -
158 - private OurServer mOurServer = null;
159 -
160 - private class OurServer extends Thread {
161 - private PeerConnectionHandlerFactory mConnectionHandlerFactory;
162 - private DatagramSocket mSocket = null;
163 -
164 - public OurServer( String pName, PeerConnectionHandlerFactory pConnectionHandlerFactory ) {
165 - super( pName );
166 - mConnectionHandlerFactory = pConnectionHandlerFactory;
167 - setDaemon( true );
168 - }
169 -
170 - public synchronized void startListening()
171 - throws SocketException {
172 - mSocket = new DatagramSocket( null );
173 - mSocket.setReuseAddress( true );
174 - mSocket.bind( new InetSocketAddress( (InetAddress) null, DISCOVERY_PORT ) );
175 - start();
176 - }
177 -
178 - public synchronized boolean stopListening() {
179 - DatagramSocket zSocket = mSocket;
180 - mSocket = null;
181 - if ( zSocket == null ) {
182 - return false;
183 - }
184 - zSocket.close();
185 - return true;
186 - }
187 -
188 - private synchronized DatagramSocket stillListening() {
189 - return mSocket;
190 - }
191 -
192 - @Override
193 - public void run() {
194 - for ( DatagramSocket zSocket; null != (zSocket = stillListening()); ) {
195 - DatagramPacket packet = new DatagramPacket( new byte[256], 256 );
196 - try {
197 - // receive request
198 - zSocket.receive( packet );
199 - }
200 - catch ( IOException e ) {
201 - if ( stopListening() ) {
202 - LOGGER.error.log( e );
203 - }
204 - return;
205 - }
206 - try {
207 - String zMessage = parseAnnouceMessage( packet );
208 - if ( zMessage != null ) {
209 - processAnnounceMessage( zMessage, packet.getAddress(), mConnectionHandlerFactory );
210 - }
211 - }
212 - catch ( RuntimeException e ) {
213 - LOGGER.error.log( e );
214 - }
215 - }
216 - }
217 - }
218 -
219 - private static String assertNotEmptyAndNoCommas( String pErrorMessage, String pStringToAssert ) {
220 - pStringToAssert = Strings.assertNotNullNotEmpty( pErrorMessage, pStringToAssert );
221 - if ( pStringToAssert.contains( "," ) ) {
222 - Strings.error( "String", pErrorMessage, " is not allowed to contain any Commas (',')" );
223 - }
224 -
225 - return pStringToAssert;
226 - }
227 -
228 - private static int parseServerPort( String pThierServerPort ) {
229 - try {
230 - return P2Putils.validatePort( Integer.parseInt( pThierServerPort ) );
231 - }
232 - catch ( RuntimeException e ) {
233 - return 0;
234 - }
235 - }
236 - }
1 + // This Source Code is in the Public Domain per: http://unlicense.org
2 + package org.litesoft.peertopeer.nonpublic.broadcastdiscovery;
3 +
4 + import org.litesoft.commonfoundation.base.*;
5 + import org.litesoft.commonfoundation.typeutils.*;
6 + import org.litesoft.logger.*;
7 + import org.litesoft.peertopeer.*;
8 + import org.litesoft.peertopeer.nonpublic.peermanagement.*;
9 + import org.litesoft.util.*;
10 +
11 + import java.io.*;
12 + import java.net.*;
13 +
14 + public class BroadcastMessageManager implements P2PConstants {
15 + public static final Logger LOGGER = LoggerFactory.getLogger( BroadcastMessageManager.class );
16 +
17 + private static final byte BCA_BYTE = (byte) 255;
18 + private static final byte[] BROADCAST_ADDRESS = new byte[]{BCA_BYTE, BCA_BYTE, BCA_BYTE, BCA_BYTE};
19 +
20 + private String mOurName, mAppGroupID, mAppGroupVersion, mMessagePrefixFilter;
21 + private AppGroupVersionCompatibilityChecker mCompatibilityChecker;
22 +
23 + public BroadcastMessageManager( String pOurName, String pAppGroupID, String pAppGroupVersion, AppGroupVersionCompatibilityChecker pCompatibilityChecker ) {
24 + mOurName = assertNotEmptyAndNoCommas( "OurName", pOurName );
25 + mAppGroupID = assertNotEmptyAndNoCommas( "AppGroupID", pAppGroupID );
26 + mAppGroupVersion = assertNotEmptyAndNoCommas( "AppGroupVersion", pAppGroupVersion );
27 + mCompatibilityChecker = pCompatibilityChecker;
28 + mMessagePrefixFilter = mAppGroupID + ",";
29 + }
30 +
31 + public String getOurName() {
32 + return mOurName;
33 + }
34 +
35 + public synchronized void assert_joinGroup_notCalled() {
36 + if ( mOurServer != null ) {
37 + throw new IllegalStateException( "BroadcastMessageManager has already joined (or attempted to join) Group: " + mAppGroupID );
38 + }
39 + }
40 +
41 + public synchronized void joinGroup( int pServerPort, PeerConnectionHandlerFactory pConnectionHandlerFactory )
42 + throws IOException {
43 + P2Putils.validatePort( pServerPort );
44 + Confirm.isNotNull( "PeerConnectionHandlerFactory", pConnectionHandlerFactory );
45 + assert_joinGroup_notCalled();
46 +
47 + mOurServer = new OurServer( getClass().getName(), pConnectionHandlerFactory );
48 +
49 + // get a datagram socket
50 + DatagramSocket socket = new DatagramSocket();
51 +
52 + try {
53 + socket.setBroadcast( true );
54 +
55 + mOurServer.startListening();
56 +
57 + byte[] buf = createAnnouceMessage( pServerPort );
58 + InetAddress address = InetAddress.getByAddress( BROADCAST_ADDRESS );
59 + DatagramPacket packet = new DatagramPacket( buf, buf.length, address, DISCOVERY_PORT );
60 + socket.send( packet );
61 + }
62 + catch ( NoRouteToHostException e ) {
63 + System.err.println( "*** Unable to Broadcast Our Existence - Proceeding Stand Alone ***" );
64 + mOurServer.stopListening();
65 + }
66 + catch ( IOException e ) {
67 + mOurServer.stopListening();
68 + throw e;
69 + }
70 + finally {
71 + socket.close();
72 + }
73 + }
74 +
75 + private void processAnnounceMessage( String pMessage, InetAddress pInetAddress, PeerConnectionHandlerFactory pConnectionHandlerFactory ) {
76 + if ( !pMessage.startsWith( mMessagePrefixFilter ) ) {
77 + return;
78 + }
79 + int cAt1 = pMessage.indexOf( ',' );
80 + int cAt2 = pMessage.indexOf( ',', cAt1 + 1 );
81 + int cAt3 = pMessage.indexOf( ',', cAt2 + 1 );
82 + if ( (cAt2 != -1) && (cAt3 != -1) ) {
83 + String thierAppGroupVersion = pMessage.substring( cAt1 + 1, cAt2 ).trim();
84 + String thierName = pMessage.substring( cAt2 + 1, cAt3 ).trim();
85 + String thierServerPort = pMessage.substring( cAt3 + 1 ).trim();
86 + if ( (thierAppGroupVersion.length() != 0) && (thierName.length() != 0) && (thierServerPort.length() != 0) ) {
87 + int zPort = parseServerPort( thierServerPort );
88 + if ( zPort != 0 ) {
89 + PeerDefinition zNewPeerDef = new PeerDefinition( thierName, pInetAddress, zPort );
90 + if ( !mAppGroupVersion.equals( thierAppGroupVersion ) &&
91 + ((mCompatibilityChecker == null) || !mCompatibilityChecker.areCompatibile( mAppGroupVersion, thierAppGroupVersion )) ) {
92 + pConnectionHandlerFactory
93 + .handleAnnouncedPeerWithIncompatibleAppGroupVersions( zNewPeerDef, mOurName, mAppGroupVersion, thierAppGroupVersion );
94 + } else {
95 + pConnectionHandlerFactory.handleAnnouncedPeer( zNewPeerDef );
96 + }
97 + return;
98 + }
99 + }
100 + }
101 + LOGGER.error.log( "MalformedBroadcastAnnounceMessage from ", pInetAddress, ":", pMessage );
102 + }
103 +
104 + private byte[] createAnnouceMessage( int pServerPort )
105 + throws UnsupportedEncodingException {
106 + return createAnnouceMessage( mAppGroupID + "," + mAppGroupVersion + "," + mOurName + "," + pServerPort );
107 + }
108 +
109 + private static byte[] createAnnouceMessage( String pMessage ) {
110 + byte[] zSBytes;
111 + try {
112 + zSBytes = pMessage.getBytes( FileUtils.UTF_8 );
113 + }
114 + catch ( UnsupportedEncodingException e ) {
115 + LOGGER.error.log( e ); // shouldn't happen
116 + zSBytes = new byte[0];
117 + }
118 + int rvLength = zSBytes.length + BROADCAST_PREFIX.length;
119 + if ( rvLength > 255 ) {
120 + throw new IllegalArgumentException( "Announce Message too long: " + pMessage );
121 + }
122 + byte[] rv = new byte[rvLength];
123 + int to = 0;
124 + for ( byte b : BROADCAST_PREFIX ) {
125 + rv[to++] = b;
126 + }
127 + for ( byte b : zSBytes ) {
128 + rv[to++] = b;
129 + }
130 + return rv;
131 + }
132 +
133 + private static String parseAnnouceMessage( DatagramPacket packet ) {
134 + int zDataLength = packet.getLength();
135 + if ( zDataLength < MINIMUM_BROADCAST_MESSAGE_LENGTH ) {
136 + return null;
137 + }
138 + byte[] zData = packet.getData();
139 + int zEncodedStringStartsAt = BROADCAST_PREFIX.length;
140 + for ( int i = 0; i < zEncodedStringStartsAt; i++ ) {
141 + if ( zData[i] != BROADCAST_PREFIX[i] ) {
142 + return null;
143 + }
144 + }
145 + String zMessage = null;
146 + try {
147 + zMessage = new String( zData, zEncodedStringStartsAt, zDataLength - zEncodedStringStartsAt, FileUtils.UTF_8 );
148 + }
149 + catch ( UnsupportedEncodingException e ) {
150 + LOGGER.error.log( e ); // shouldn't happen
151 + }
152 + catch ( RuntimeException e ) {
153 + // Not Properly Encoded?
154 + LOGGER.error.log( e );
155 + }
156 + return zMessage;
157 + }
158 +
159 + private OurServer mOurServer = null;
160 +
161 + private class OurServer extends Thread {
162 + private PeerConnectionHandlerFactory mConnectionHandlerFactory;
163 + private DatagramSocket mSocket = null;
164 +
165 + public OurServer( String pName, PeerConnectionHandlerFactory pConnectionHandlerFactory ) {
166 + super( pName );
167 + mConnectionHandlerFactory = pConnectionHandlerFactory;
168 + setDaemon( true );
169 + }
170 +
171 + public synchronized void startListening()
172 + throws SocketException {
173 + mSocket = new DatagramSocket( null );
174 + mSocket.setReuseAddress( true );
175 + mSocket.bind( new InetSocketAddress( (InetAddress) null, DISCOVERY_PORT ) );
176 + start();
177 + }
178 +
179 + public synchronized boolean stopListening() {
180 + DatagramSocket zSocket = mSocket;
181 + mSocket = null;
182 + if ( zSocket == null ) {
183 + return false;
184 + }
185 + zSocket.close();
186 + return true;
187 + }
188 +
189 + private synchronized DatagramSocket stillListening() {
190 + return mSocket;
191 + }
192 +
193 + @Override
194 + public void run() {
195 + for ( DatagramSocket zSocket; null != (zSocket = stillListening()); ) {
196 + DatagramPacket packet = new DatagramPacket( new byte[256], 256 );
197 + try {
198 + // receive request
199 + zSocket.receive( packet );
200 + }
201 + catch ( IOException e ) {
202 + if ( stopListening() ) {
203 + LOGGER.error.log( e );
204 + }
205 + return;
206 + }
207 + try {
208 + String zMessage = parseAnnouceMessage( packet );
209 + if ( zMessage != null ) {
210 + processAnnounceMessage( zMessage, packet.getAddress(), mConnectionHandlerFactory );
211 + }
212 + }
213 + catch ( RuntimeException e ) {
214 + LOGGER.error.log( e );
215 + }
216 + }
217 + }
218 + }
219 +
220 + private static String assertNotEmptyAndNoCommas( String pErrorMessage, String pStringToAssert ) {
221 + pStringToAssert = Confirm.significant( pErrorMessage, pStringToAssert );
222 + if ( pStringToAssert.contains( "," ) ) {
223 + Strings.error( "String", pErrorMessage, " is not allowed to contain any Commas (',')" );
224 + }
225 +
226 + return pStringToAssert;
227 + }
228 +
229 + private static int parseServerPort( String pThierServerPort ) {
230 + try {
231 + return P2Putils.validatePort( Integer.parseInt( pThierServerPort ) );
232 + }
233 + catch ( RuntimeException e ) {
234 + return 0;
235 + }
236 + }
237 + }