Subversion Repository Public Repository

litesoft

Diff Revisions 947 vs 948 for /trunk/Java/GWT/OldClient/src/org/litesoft/GWT/eventbus/client/nonpublic/StatePeerSupportingEventBusRegular.java

Diff revisions: vs.
  @@ -1,18 +1,16 @@
1 1 // This Source Code is in the Public Domain per: http://unlicense.org
2 2 package org.litesoft.GWT.eventbus.client.nonpublic;
3 3
4 - import org.litesoft.commonfoundation.issues.*;
5 - import org.litesoft.commonfoundation.typeutils.*;
6 -
7 - import java.util.*;
8 -
9 4 import org.litesoft.GWT.eventbus.client.*;
10 5 import org.litesoft.GWT.eventbus.client.rpc.*;
6 + import org.litesoft.commonfoundation.issues.*;
11 7 import org.litesoft.commonfoundation.typeutils.Objects;
8 + import org.litesoft.commonfoundation.typeutils.*;
12 9 import org.litesoft.logger.*;
13 10
14 - public class StatePeerSupportingEventBusRegular extends StatePeerSupportingEventBus
15 - {
11 + import java.util.*;
12 +
13 + public class StatePeerSupportingEventBusRegular extends StatePeerSupportingEventBus {
16 14 private final PeerInterestManager mPeerInterestManager;
17 15 private final SubscriptionManager mSubscriptionManager;
18 16 private final EventBus mEventBus;
  @@ -27,8 +25,7 @@
27 25 IProcessingManager pProcessingManager,
28 26 RemotePeerService pRemotePeerService,
29 27 EventBusMessageCollector pCollector,
30 - SavedEventsExpectingResponseTracker pSavedEventsExpectingResponseTracker )
31 - {
28 + SavedEventsExpectingResponseTracker pSavedEventsExpectingResponseTracker ) {
32 29 super( pLogger, pPeerSupportingEventBus, pSavedEventsExpectingResponseTracker );
33 30
34 31 mProcessingManager = pProcessingManager;
  @@ -49,8 +46,7 @@
49 46 public StatePeerSupportingEventBusRegular( Logger pLogger, PeerSupportingEventBus pPeerSupportingEventBus,
50 47 EventConstrictor pEventConstrictor, EventBus pEventBus,
51 48 IProcessingManager pProcessingManager,
52 - RemotePeerService pRemotePeerService )
53 - {
49 + RemotePeerService pRemotePeerService ) {
54 50 this( pLogger, pPeerSupportingEventBus, //
55 51 pEventConstrictor, pEventBus, //
56 52 pProcessingManager, //
  @@ -59,14 +55,12 @@
59 55 new SavedEventsExpectingResponseTracker( pLogger ) );
60 56 }
61 57
62 - public void disposing()
63 - {
58 + public void disposing() {
64 59 mRemotePeerService.stopUnrequestedMessages();
65 60 mProcessingManager.waitTillNoOthers();
66 61 }
67 62
68 - public synchronized void dispose()
69 - {
63 + public synchronized void dispose() {
70 64 mPeerInterestManager.clear();
71 65 mSubscriptionManager.clear();
72 66 clearPeerService();
  @@ -74,37 +68,27 @@
74 68 }
75 69
76 70 public IStatePeerSupportingEventBus suspendedToRegular( EventConstrictor pEventConstrictor,
77 - EventBus pEventBus )
78 - {
71 + EventBus pEventBus ) {
79 72 throw new IllegalStateException( "Not Suspended" );
80 73 }
81 74
82 - public void publishExpectingResponse( EventPackage pPackage )
83 - {
75 + public void publishExpectingResponse( EventPackage pPackage ) {
84 76 String sourceName = pPackage.getSourceName();
85 - synchronized ( this )
86 - {
87 - if ( mSubscriptionManager.hasPeerAcknowledged( sourceName ) )
88 - {
77 + synchronized ( this ) {
78 + if ( mSubscriptionManager.hasPeerAcknowledged( sourceName ) ) {
89 79 getEventBusMessageCollector().addPublishedEvent( pPackage );
90 - }
91 - else
92 - {
80 + } else {
93 81 mSavedEventsExpectingResponseTracker.add( pPackage );
94 82 }
95 83 }
96 84 releaseCollected();
97 85 }
98 86
99 - public boolean releaseCollected()
100 - {
87 + public boolean releaseCollected() {
101 88 boolean rv = true;
102 - if ( mProcessingManager.incThreadsAndIndicateIfWeAreFirst() )
103 - {
104 - for ( EventBusMessageCollector collector; null != (collector = grabCollector()); )
105 - {
106 - if ( !process( collector ) )
107 - {
89 + if ( mProcessingManager.incThreadsAndIndicateIfWeAreFirst() ) {
90 + for ( EventBusMessageCollector collector; null != (collector = grabCollector()); ) {
91 + if ( !process( collector ) ) {
108 92 rv = false;
109 93 }
110 94 }
  @@ -113,51 +97,41 @@
113 97 return rv;
114 98 }
115 99
116 - protected synchronized EventBusMessageCollector getEventBusMessageCollector()
117 - {
100 + protected synchronized EventBusMessageCollector getEventBusMessageCollector() {
118 101 return mCollector;
119 102 }
120 103
121 - private synchronized EventBusMessageCollector grabCollector()
122 - {
104 + private synchronized EventBusMessageCollector grabCollector() {
123 105 EventBusMessageCollector rv = null;
124 - if ( !mCollector.isEmpty() )
125 - {
106 + if ( !mCollector.isEmpty() ) {
126 107 rv = mCollector;
127 108 mCollector = new EventBusMessageCollector( mLogger );
128 109 }
129 110 return rv;
130 111 }
131 112
132 - private synchronized void clearPeerService()
133 - {
113 + private synchronized void clearPeerService() {
134 114 mRemotePeerService = mRemotePeerService.dispose();
135 115 }
136 116
137 - private synchronized RemotePeerService getRemotePeerService()
138 - {
117 + private synchronized RemotePeerService getRemotePeerService() {
139 118 return mRemotePeerService;
140 119 }
141 120
142 - protected boolean process( EventBusMessageCollector pCollector )
143 - {
121 + protected boolean process( EventBusMessageCollector pCollector ) {
144 122 List zAddPeerInterests4Remote = new ArrayList();
145 123 List zRemovePeerInterests4Remote = new ArrayList();
146 124 List zEvents4Remote = new ArrayList();
147 125
148 - for ( Iterator it = pCollector.getSubscribes(); it.hasNext(); )
149 - {
126 + for ( Iterator it = pCollector.getSubscribes(); it.hasNext(); ) {
150 127 EventSubscriptionStructure member = (EventSubscriptionStructure) it.next();
151 - if ( mSubscriptionManager.subscribe( member.getSubscribeWith(), member.getEventListener() ) )
152 - {
128 + if ( mSubscriptionManager.subscribe( member.getSubscribeWith(), member.getEventListener() ) ) {
153 129 zAddPeerInterests4Remote.add( member.getSubscribeWith() );
154 130 }
155 131 }
156 - for ( Iterator it = pCollector.getUnsubscribes(); it.hasNext(); )
157 - {
132 + for ( Iterator it = pCollector.getUnsubscribes(); it.hasNext(); ) {
158 133 EventSubscriptionStructure member = (EventSubscriptionStructure) it.next();
159 - if ( mSubscriptionManager.unsubscribe( member.getSubscribeWith(), member.getEventListener() ) )
160 - {
134 + if ( mSubscriptionManager.unsubscribe( member.getSubscribeWith(), member.getEventListener() ) ) {
161 135 zRemovePeerInterests4Remote.add( member.getSubscribeWith() );
162 136 }
163 137 }
  @@ -167,16 +141,12 @@
167 141
168 142 mPeerInterestManager.remove( pCollector.getRemoveRemotePeerInterests() );
169 143
170 - for ( Iterator it = pCollector.getFromRemotePeerEvents(); it.hasNext(); )
171 - {
144 + for ( Iterator it = pCollector.getFromRemotePeerEvents(); it.hasNext(); ) {
172 145 EventPackage member = (EventPackage) it.next();
173 146 String error = mEventConstrictor.isLocalDistributionFromPeerOK( member );
174 - if ( error == null )
175 - {
147 + if ( error == null ) {
176 148 distributeToLocals( member );
177 - }
178 - else
179 - {
149 + } else {
180 150 mEventBus.handleUnexpectedProblem( "EventConstrictor:" + error,
181 151 new Problem( "DistributionNotAllowed", //
182 152 error, //
  @@ -186,20 +156,17 @@
186 156 }
187 157 }
188 158
189 - for ( Iterator it = pCollector.getPublishedEvents(); it.hasNext(); )
190 - {
159 + for ( Iterator it = pCollector.getPublishedEvents(); it.hasNext(); ) {
191 160 EventPackage member = (EventPackage) it.next();
192 161 publishFromLocal( zEvents4Remote, member );
193 162 }
194 163
195 - for ( Iterator it = pCollector.getFromRemoteAckPeerInterests(); it.hasNext(); )
196 - {
164 + for ( Iterator it = pCollector.getFromRemoteAckPeerInterests(); it.hasNext(); ) {
197 165 peerHasAcked( it.next().toString() );
198 166 }
199 167
200 168 if ( zAddPeerInterests4Remote.isEmpty() && zRemovePeerInterests4Remote.isEmpty() &&
201 - zEvents4Remote.isEmpty() && zAddedRemotePeerInterests.isEmpty() )
202 - {
169 + zEvents4Remote.isEmpty() && zAddedRemotePeerInterests.isEmpty() ) {
203 170 return true;
204 171 }
205 172
  @@ -210,46 +177,36 @@
210 177 return getRemotePeerService().forwardTo( csp );
211 178 }
212 179
213 - private ChannelEventPackage[] toChannelEventArray( List pList )
214 - {
180 + private ChannelEventPackage[] toChannelEventArray( List pList ) {
215 181 ChannelEventPackage[] rv = new ChannelEventPackage[pList.size()];
216 - for ( int i = 0; i < pList.size(); i++ )
217 - {
182 + for ( int i = 0; i < pList.size(); i++ ) {
218 183 rv[i] = (ChannelEventPackage) pList.get( i );
219 184 }
220 185 return rv;
221 186 }
222 187
223 - private String[] toStringArray( Collection pCollection )
224 - {
188 + private String[] toStringArray( Collection pCollection ) {
225 189 return Strings.toArray( pCollection );
226 190 }
227 191
228 - private void publishFromLocal( List pPackages4Remote, EventPackage pPackage )
229 - {
192 + private void publishFromLocal( List pPackages4Remote, EventPackage pPackage ) {
230 193 distributeToLocals( pPackage );
231 - if ( mPeerInterestManager.isPeerInterested( pPackage ) )
232 - {
194 + if ( mPeerInterestManager.isPeerInterested( pPackage ) ) {
233 195 pPackages4Remote.add( pPackage );
234 196 }
235 197 }
236 198
237 - private void distributeToLocals( EventPackage pPackage )
238 - {
199 + private void distributeToLocals( EventPackage pPackage ) {
239 200 EventPackageListener[] listeners = mSubscriptionManager.getListenersFor( pPackage.getTargetName() );
240 - if ( listeners != null )
241 - {
201 + if ( listeners != null ) {
242 202 mLogger.trace.log( "(distributeToLocals):", pPackage );
243 203 //noinspection ForLoopReplaceableByForEach
244 - for ( int i = 0; i < listeners.length; i++ )
245 - {
204 + for ( int i = 0; i < listeners.length; i++ ) {
246 205 EventPackageListener zListener = listeners[i];
247 - try
248 - {
206 + try {
249 207 zListener.packageReceivedVia( pPackage, mEventBus );
250 208 }
251 - catch ( RuntimeException e )
252 - {
209 + catch ( RuntimeException e ) {
253 210 String zListenerType = Objects.classNameOf( zListener );
254 211 mLogger.error.log( e, zListenerType, pPackage );
255 212 // Note: GWT does not provide stack traces in client mode
  @@ -261,16 +218,13 @@
261 218 }
262 219 }
263 220
264 - public String toString()
265 - {
221 + public String toString() {
266 222 return super.toString() + "Regular";
267 223 }
268 224
269 - private void peerHasAcked( String pTargetName )
270 - {
225 + private void peerHasAcked( String pTargetName ) {
271 226 EventPackage[] packages;
272 - synchronized ( this )
273 - {
227 + synchronized ( this ) {
274 228 mSubscriptionManager.setPeerAcknowledged( pTargetName );
275 229 packages = mSavedEventsExpectingResponseTracker.getAndReleaseEventsFor( pTargetName );
276 230 }