Subversion Repository Public Repository

litesoft

Diff Revisions 949 vs 950 for /trunk/Java/GWT/OldServer/src/org/litesoft/GWT/eventbus/server/nonpublic/ServerSideRemotePeerService.java

Diff revisions: vs.
  @@ -1,145 +1,145 @@
1 - // This Source Code is in the Public Domain per: http://unlicense.org
2 - package org.litesoft.GWT.eventbus.server.nonpublic;
3 -
4 - import org.litesoft.GWT.eventbus.client.nonpublic.*;
5 - import org.litesoft.GWT.eventbus.client.rpc.*;
6 - import org.litesoft.commonfoundation.base.*;
7 - import org.litesoft.logger.*;
8 -
9 - import java.util.*;
10 -
11 - public class ServerSideRemotePeerService implements RemotePeerService {
12 - private final MillisecTimeSource mMillisecTimeSource;
13 - private final Set<String> mAddPeerInterestsForClient = new HashSet<String>();
14 - private final Set<String> mRemovePeerInterestsForClient = new HashSet<String>();
15 - private final List<ChannelEventPackage> mChannelEventsForClient = new ArrayList<ChannelEventPackage>();
16 - private final Set<String> mAckPeerInterestsForClient = new HashSet<String>();
17 -
18 - private final Logger mLogger;
19 -
20 - private Long mGiveUpAfter = null;
21 - private PeerSupportingEventBus mServerEventBus;
22 -
23 - public ServerSideRemotePeerService( MillisecTimeSource pMillisecTimeSource, Logger pLogger, PeerSupportingEventBus pServerEventBus ) {
24 - mMillisecTimeSource = MillisecTimeSource.deNull( pMillisecTimeSource );
25 - mLogger = pLogger;
26 - mServerEventBus = (pServerEventBus != null) ? //
27 - pServerEventBus : //
28 - PeerSupportingEventBus.Null.INSTANCE;
29 - }
30 -
31 - public ServerSideRemotePeerService( Logger pLogger, PeerSupportingEventBus pServerEventBus ) {
32 - this( null, pLogger, pServerEventBus );
33 - }
34 -
35 - private synchronized void setGiveUpAfter( int pPollMillisecs ) {
36 - if ( pPollMillisecs > 0 ) {
37 - mGiveUpAfter = mMillisecTimeSource.now() + pPollMillisecs + 60000L * 5; // 5 minutes
38 - }
39 - }
40 -
41 - private boolean okToCollect() // synchronized by forwardTo()
42 - {
43 - return (mGiveUpAfter == null) || (mMillisecTimeSource.now() < mGiveUpAfter);
44 - }
45 -
46 - private synchronized PeerSupportingEventBus getServerEventBus() {
47 - return mServerEventBus;
48 - }
49 -
50 - private synchronized void clrServerEventBus() {
51 - mServerEventBus = PeerSupportingEventBus.Null.INSTANCE;
52 - }
53 -
54 - public void stopUnrequestedMessages() {
55 - }
56 -
57 - public synchronized RemotePeerService dispose() {
58 - mLogger.trace.log( "Dispose: ServerSideRemotePeerService" );
59 - mAddPeerInterestsForClient.clear();
60 - mRemovePeerInterestsForClient.clear();
61 - mChannelEventsForClient.clear();
62 - mServerEventBus = PeerSupportingEventBus.Null.INSTANCE;
63 - return RemotePeerService.Null.INSTANCE;
64 - }
65 -
66 - public synchronized boolean forwardTo( ChannelServicePackage pToClient ) {
67 - if ( pToClient != null ) {
68 - if ( pToClient.anyAddPeerInterests() ) {
69 - for ( String peerInterest : pToClient.getAddPeerInterests() ) {
70 - mAddPeerInterestsForClient.add( peerInterest );
71 - }
72 - }
73 - if ( pToClient.anyRemovePeerInterests() ) {
74 - for ( String peerInterest : pToClient.getRemovePeerInterests() ) {
75 - mRemovePeerInterestsForClient.add( peerInterest );
76 - }
77 - }
78 - if ( pToClient.anyEvents() ) {
79 - for ( ChannelEventPackage event : pToClient.getEvents() ) {
80 - mChannelEventsForClient.add( event );
81 - }
82 - }
83 - if ( pToClient.anyAckPeerInterests() ) {
84 - for ( String ackInterest : pToClient.getAckPeerInterests() ) {
85 - mAckPeerInterestsForClient.add( ackInterest );
86 - }
87 - }
88 - }
89 - return okToCollect();
90 - }
91 -
92 - public ChannelServicePackage processClientRequest( int pPollMillisecs, ChannelServicePackage pFromClient ) {
93 - setGiveUpAfter( pPollMillisecs );
94 - PeerSupportingEventBus eventBus = getServerEventBus();
95 - boolean peerOK;
96 - if ( (pFromClient != null) && pFromClient.isNotEmpty() ) {
97 - peerOK = eventBus.propagateFromRemotePeerService( pFromClient );
98 - } else {
99 - peerOK = eventBus.ping();
100 - }
101 - if ( !peerOK ) {
102 - clrServerEventBus();
103 - }
104 - synchronized ( this ) {
105 - return new ChannelServicePackage( extractAddPeerInterestsForClient(), //
106 - extractRemovePeerInterestsForClient(), //
107 - extractChannelEventsForClient(), //
108 - extractAckPeerInterestsForClient() );
109 - }
110 - }
111 -
112 - public String toString() {
113 - return "ServerSideRemotePeerService";
114 - }
115 -
116 - private String[] extractAddPeerInterestsForClient() // synchronized in processClientRequest()
117 - {
118 - String[] rv = mAddPeerInterestsForClient.toArray( new String[mAddPeerInterestsForClient.size()] );
119 - mAddPeerInterestsForClient.clear();
120 - return rv;
121 - }
122 -
123 - private String[] extractRemovePeerInterestsForClient() // synchronized in processClientRequest()
124 - {
125 - String[] rv =
126 - mRemovePeerInterestsForClient.toArray( new String[mRemovePeerInterestsForClient.size()] );
127 - mRemovePeerInterestsForClient.clear();
128 - return rv;
129 - }
130 -
131 - private ChannelEventPackage[] extractChannelEventsForClient() // synchronized in processClientRequest()
132 - {
133 - ChannelEventPackage[] rv =
134 - mChannelEventsForClient.toArray( new ChannelEventPackage[mChannelEventsForClient.size()] );
135 - mChannelEventsForClient.clear();
136 - return rv;
137 - }
138 -
139 - private String[] extractAckPeerInterestsForClient() // synchronized in processClientRequest()
140 - {
141 - String[] rv = mAckPeerInterestsForClient.toArray( new String[mAckPeerInterestsForClient.size()] );
142 - mAckPeerInterestsForClient.clear();
143 - return rv;
144 - }
145 - }
1 + // This Source Code is in the Public Domain per: http://unlicense.org
2 + package org.litesoft.GWT.eventbus.server.nonpublic;
3 +
4 + import org.litesoft.GWT.eventbus.client.nonpublic.*;
5 + import org.litesoft.GWT.eventbus.client.rpc.*;
6 + import org.litesoft.commonfoundation.base.*;
7 + import org.litesoft.logger.*;
8 +
9 + import java.util.*;
10 +
11 + public class ServerSideRemotePeerService implements RemotePeerService {
12 + private final MillisecTimeSource mMillisecTimeSource;
13 + private final Set<String> mAddPeerInterestsForClient = new HashSet<String>();
14 + private final Set<String> mRemovePeerInterestsForClient = new HashSet<String>();
15 + private final List<ChannelEventPackage> mChannelEventsForClient = new ArrayList<ChannelEventPackage>();
16 + private final Set<String> mAckPeerInterestsForClient = new HashSet<String>();
17 +
18 + private final Logger mLogger;
19 +
20 + private Long mGiveUpAfter = null;
21 + private PeerSupportingEventBus mServerEventBus;
22 +
23 + public ServerSideRemotePeerService( MillisecTimeSource pMillisecTimeSource, Logger pLogger, PeerSupportingEventBus pServerEventBus ) {
24 + mMillisecTimeSource = MillisecTimeSource.deNull( pMillisecTimeSource );
25 + mLogger = pLogger;
26 + mServerEventBus = (pServerEventBus != null) ? //
27 + pServerEventBus : //
28 + PeerSupportingEventBus.Null.INSTANCE;
29 + }
30 +
31 + public ServerSideRemotePeerService( Logger pLogger, PeerSupportingEventBus pServerEventBus ) {
32 + this( null, pLogger, pServerEventBus );
33 + }
34 +
35 + private synchronized void setGiveUpAfter( int pPollMillisecs ) {
36 + if ( pPollMillisecs > 0 ) {
37 + mGiveUpAfter = mMillisecTimeSource.now() + pPollMillisecs + 60000L * 5; // 5 minutes
38 + }
39 + }
40 +
41 + private boolean okToCollect() // synchronized by forwardTo()
42 + {
43 + return (mGiveUpAfter == null) || (mMillisecTimeSource.now() < mGiveUpAfter);
44 + }
45 +
46 + private synchronized PeerSupportingEventBus getServerEventBus() {
47 + return mServerEventBus;
48 + }
49 +
50 + private synchronized void clrServerEventBus() {
51 + mServerEventBus = PeerSupportingEventBus.Null.INSTANCE;
52 + }
53 +
54 + public void stopUnrequestedMessages() {
55 + }
56 +
57 + public synchronized RemotePeerService dispose() {
58 + mLogger.trace.log( "Dispose: ServerSideRemotePeerService" );
59 + mAddPeerInterestsForClient.clear();
60 + mRemovePeerInterestsForClient.clear();
61 + mChannelEventsForClient.clear();
62 + mServerEventBus = PeerSupportingEventBus.Null.INSTANCE;
63 + return RemotePeerService.Null.INSTANCE;
64 + }
65 +
66 + public synchronized boolean forwardTo( ChannelServicePackage pToClient ) {
67 + if ( pToClient != null ) {
68 + if ( pToClient.anyAddPeerInterests() ) {
69 + for ( String peerInterest : pToClient.getAddPeerInterests() ) {
70 + mAddPeerInterestsForClient.add( peerInterest );
71 + }
72 + }
73 + if ( pToClient.anyRemovePeerInterests() ) {
74 + for ( String peerInterest : pToClient.getRemovePeerInterests() ) {
75 + mRemovePeerInterestsForClient.add( peerInterest );
76 + }
77 + }
78 + if ( pToClient.anyEvents() ) {
79 + for ( ChannelEventPackage event : pToClient.getEvents() ) {
80 + mChannelEventsForClient.add( event );
81 + }
82 + }
83 + if ( pToClient.anyAckPeerInterests() ) {
84 + for ( String ackInterest : pToClient.getAckPeerInterests() ) {
85 + mAckPeerInterestsForClient.add( ackInterest );
86 + }
87 + }
88 + }
89 + return okToCollect();
90 + }
91 +
92 + public ChannelServicePackage processClientRequest( int pPollMillisecs, ChannelServicePackage pFromClient ) {
93 + setGiveUpAfter( pPollMillisecs );
94 + PeerSupportingEventBus eventBus = getServerEventBus();
95 + boolean peerOK;
96 + if ( (pFromClient != null) && pFromClient.isNotEmpty() ) {
97 + peerOK = eventBus.propagateFromRemotePeerService( pFromClient );
98 + } else {
99 + peerOK = eventBus.ping();
100 + }
101 + if ( !peerOK ) {
102 + clrServerEventBus();
103 + }
104 + synchronized ( this ) {
105 + return new ChannelServicePackage( extractAddPeerInterestsForClient(), //
106 + extractRemovePeerInterestsForClient(), //
107 + extractChannelEventsForClient(), //
108 + extractAckPeerInterestsForClient() );
109 + }
110 + }
111 +
112 + public String toString() {
113 + return "ServerSideRemotePeerService";
114 + }
115 +
116 + private String[] extractAddPeerInterestsForClient() // synchronized in processClientRequest()
117 + {
118 + String[] rv = mAddPeerInterestsForClient.toArray( new String[mAddPeerInterestsForClient.size()] );
119 + mAddPeerInterestsForClient.clear();
120 + return rv;
121 + }
122 +
123 + private String[] extractRemovePeerInterestsForClient() // synchronized in processClientRequest()
124 + {
125 + String[] rv =
126 + mRemovePeerInterestsForClient.toArray( new String[mRemovePeerInterestsForClient.size()] );
127 + mRemovePeerInterestsForClient.clear();
128 + return rv;
129 + }
130 +
131 + private ChannelEventPackage[] extractChannelEventsForClient() // synchronized in processClientRequest()
132 + {
133 + ChannelEventPackage[] rv =
134 + mChannelEventsForClient.toArray( new ChannelEventPackage[mChannelEventsForClient.size()] );
135 + mChannelEventsForClient.clear();
136 + return rv;
137 + }
138 +
139 + private String[] extractAckPeerInterestsForClient() // synchronized in processClientRequest()
140 + {
141 + String[] rv = mAckPeerInterestsForClient.toArray( new String[mAckPeerInterestsForClient.size()] );
142 + mAckPeerInterestsForClient.clear();
143 + return rv;
144 + }
145 + }