10
10
import java .util .Map ;
11
11
import java .util .Map .Entry ;
12
12
import java .util .Queue ;
13
+ import java .util .concurrent .ArrayBlockingQueue ;
14
+ import java .util .concurrent .BlockingQueue ;
13
15
import java .util .concurrent .ConcurrentHashMap ;
14
16
import java .util .concurrent .ConcurrentLinkedQueue ;
15
17
import java .util .concurrent .ExecutorService ;
23
25
24
26
public class talk extends AbstractApplication {
25
27
26
- private static final long TIMEOUT = 200 ;
28
+ private static final long TIMEOUT = 1 ;
29
+ private static final int DEFAULT_POOL_SIZE = 3 ;
30
+ protected static final int DEFAULT_MESSAGE_POOL_SIZE = 10 ;
31
+ protected final Map <String , BlockingQueue <Builder >> meetings = new ConcurrentHashMap <String , BlockingQueue <Builder >>();
27
32
protected final Map <String , Queue <Builder >> list = new ConcurrentHashMap <String , Queue <Builder >>();
28
- protected final Map <String , Queue <Builder >> meetings = new ConcurrentHashMap <String , Queue <Builder >>();
29
33
protected final Map <String , List <String >> sessions = new ConcurrentHashMap <String , List <String >>();
30
- private final ExecutorService service = Executors . newFixedThreadPool ( 3 ) ;
34
+ private ExecutorService service ;
31
35
32
36
@ Override
33
37
public void init () {
@@ -36,23 +40,25 @@ public void init() {
36
40
this .setAction ("talk/version" , "version" );
37
41
this .setAction ("talk/testing" , "testing" );
38
42
39
- Runtime .getRuntime ().addShutdownHook (new Thread (new Runnable () {
40
- @ Override
41
- public void run () {
42
- service .shutdown ();
43
- while (true ) {
44
- try {
45
- System .out .println ("Waiting for the service to terminate..." );
46
- if (service .awaitTermination (5 , TimeUnit .SECONDS )) {
47
- System .out .println ("Service will be terminated soon." );
43
+ if (this .service != null ) {
44
+ Runtime .getRuntime ().addShutdownHook (new Thread (new Runnable () {
45
+ @ Override
46
+ public void run () {
47
+ service .shutdown ();
48
+ while (true ) {
49
+ try {
50
+ System .out .println ("Waiting for the service to terminate..." );
51
+ if (service .awaitTermination (5 , TimeUnit .SECONDS )) {
52
+ System .out .println ("Service will be terminated soon." );
48
53
break ;
49
- }
50
- } catch (InterruptedException e ) {
51
- e .printStackTrace ();
52
- }
53
- }
54
- }
55
- }));
54
+ }
55
+ } catch (InterruptedException e ) {
56
+ e .printStackTrace ();
57
+ }
58
+ }
59
+ }
60
+ }));
61
+ }
56
62
}
57
63
58
64
/**
@@ -86,37 +92,38 @@ public String save(Object meetingCode, String sessionId, String message) {
86
92
* @return builder
87
93
*/
88
94
public final String save (final Object meetingCode , final Builder builder ) {
89
- final Queue <Builder > messages ;
90
- synchronized (this .meetings ) {
91
- if (this .meetings .get (meetingCode ) == null ) {
92
- this .meetings .put (meetingCode .toString (), new ConcurrentLinkedQueue <Builder >());
93
- }
95
+ BlockingQueue <Builder > messages ;
96
+ if ((messages = this .meetings .get (meetingCode )) == null ) {
97
+ this .meetings .put (meetingCode .toString (), messages = new ArrayBlockingQueue <Builder >(DEFAULT_MESSAGE_POOL_SIZE ));
98
+ }
94
99
95
- messages = this .meetings .get (meetingCode );
96
- messages .add (builder );
97
- this .meetings .notifyAll ();
100
+ try {
101
+ messages .put (builder );
102
+ } catch (InterruptedException e ) {
103
+ e .printStackTrace ();
98
104
}
99
105
100
- service .execute (new Runnable (){
106
+ this . getService () .execute (new Runnable (){
101
107
@ Override
102
108
public void run () {
103
- synchronized (talk .this .meetings ) {
104
109
Builder message ;
105
110
do {
106
111
try {
107
- talk . this . meetings . wait (TIMEOUT );
112
+ Thread . sleep (TIMEOUT );
108
113
} catch (InterruptedException e ) {
109
- e .printStackTrace ();
114
+ e .printStackTrace ();
110
115
}
111
116
} while (talk .this .meetings .get (meetingCode ) == null || (message = talk .this .meetings .get (meetingCode ).poll ()) == null );
112
-
113
117
talk .this .copy (meetingCode , message );
114
- }
115
118
}
116
119
});
117
120
return builder .toString ();
118
121
}
119
122
123
+ private ExecutorService getService () {
124
+ return this .service !=null ? this .service : Executors .newFixedThreadPool (DEFAULT_POOL_SIZE );
125
+ }
126
+
120
127
/**
121
128
* Poll message from the messages of the session specified sessionId.
122
129
* @param sessionId
@@ -126,19 +133,16 @@ public void run() {
126
133
*/
127
134
public final String update (final String sessionId ) throws ApplicationException , IOException {
128
135
Builder message ;
129
- Queue <Builder > messages ;
130
- synchronized (this .list ) {
131
- messages = this .list .get (sessionId );
132
- while ((message = messages .poll ()) == null ) {
133
- try {
134
- this .list .wait (TIMEOUT );
135
- } catch (InterruptedException e ) {
136
- throw new ApplicationException (e .getMessage (), e );
137
- }
136
+ Queue <Builder > messages = this .list .get (sessionId );
137
+ while ((message = messages .poll ()) == null ) {
138
+ try {
139
+ Thread .sleep (TIMEOUT );
140
+ } catch (InterruptedException e ) {
141
+ throw new ApplicationException (e .getMessage (), e );
138
142
}
139
-
140
- return message .toString ();
141
143
}
144
+
145
+ return message .toString ();
142
146
}
143
147
144
148
/**
@@ -156,22 +160,17 @@ protected String filter(String text) {
156
160
* @param builder
157
161
*/
158
162
private final void copy (Object meetingCode , Builder builder ) {
159
- synchronized (this .list ) {
160
- final Collection <Entry <String , Queue <Builder >>> set = list .entrySet ();
163
+ final Collection <Entry <String , Queue <Builder >>> set = this .list .entrySet ();
161
164
final Iterator <Entry <String , Queue <Builder >>> iterator = set .iterator ();
162
- final List <String > meeting_session ;
163
- if ((meeting_session = this .sessions .get (meetingCode )) != null ) {
165
+ final List <String > _sessions ;
166
+ if ((_sessions = this .sessions .get (meetingCode )) != null ) {
164
167
while (iterator .hasNext ()) {
165
- Entry <String , Queue <Builder >> e = iterator .next ();
166
- if (meeting_session .contains (e .getKey ())) {
167
- e .getValue ().add (builder );
168
- this .list .notifyAll ();
168
+ Entry <String , Queue <Builder >> list = iterator .next ();
169
+ if (_sessions .contains (list .getKey ())) {
170
+ list .getValue ().add (builder );
169
171
}
170
172
}
171
173
}
172
- else
173
- this .list .notifyAll ();
174
- }
175
174
}
176
175
177
176
@ Override
@@ -188,7 +187,7 @@ public String version() {
188
187
* @throws ApplicationException
189
188
*/
190
189
public boolean testing (final int n ) throws ApplicationException {
191
- this .meetings .put ("[M001]" , new ConcurrentLinkedQueue <Builder >());
190
+ this .meetings .put ("[M001]" , new ArrayBlockingQueue <Builder >(DEFAULT_MESSAGE_POOL_SIZE ));
192
191
this .list .put ("{A}" , new ConcurrentLinkedQueue <Builder >());
193
192
this .list .put ("{B}" , new ConcurrentLinkedQueue <Builder >());
194
193
@@ -197,7 +196,7 @@ public boolean testing(final int n) throws ApplicationException {
197
196
sess .add ("{B}" );
198
197
this .sessions .put ("[M001]" , sess );
199
198
200
- service .execute (new Runnable (){
199
+ this . getService () .execute (new Runnable (){
201
200
@ Override
202
201
public void run () {
203
202
int i =0 ;
@@ -215,7 +214,7 @@ public void run() {
215
214
}
216
215
});
217
216
218
- service .execute (new Runnable (){
217
+ this . getService () .execute (new Runnable (){
219
218
@ Override
220
219
public void run () {
221
220
int i =0 ;
@@ -233,7 +232,7 @@ public void run() {
233
232
}
234
233
});
235
234
236
- service .execute (new Runnable (){
235
+ this . getService () .execute (new Runnable (){
237
236
@ Override
238
237
public void run () {
239
238
// TODO Auto-generated method stub
@@ -252,7 +251,7 @@ public void run() {
252
251
}
253
252
});
254
253
255
- service .execute (new Runnable (){
254
+ this . getService () .execute (new Runnable (){
256
255
@ Override
257
256
public void run () {
258
257
// TODO Auto-generated method stub
0 commit comments