Jars required:
- hamcrest-all-1.1.jar
- junit.jar
- netty-3.6.3.Final.jar
org.freeswitch.esl.client-0.9.3-SNAPSHOT.jar
package net.freeswitch;
/*
* Copyright 2010 david varnes.
*
* Licensed under the Apache License, version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map.Entry;
import java.util.UUID;
import org.freeswitch.esl.client.IEslEventListener;
import org.freeswitch.esl.client.inbound.InboundConnectionFailure;
import org.freeswitch.esl.client.transport.CommandResponse;
import org.freeswitch.esl.client.transport.SendMsg;
import org.freeswitch.esl.client.transport.event.EslEvent;
import org.freeswitch.esl.client.transport.message.EslHeaders.Name;
import org.freeswitch.esl.client.transport.message.EslMessage;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientTest
{
private final Logger log = LoggerFactory.getLogger( this.getClass() );
private String host = "127.0.0.1";
private int port = 8021;
private String password = "ClueCon";
@Test
public void do_connect() throws InterruptedException
{
Client client = new Client();
client.addEventListener( new IEslEventListener()
{
public void eventReceived( EslEvent event )
{
log.info( "Event received [{}]", event );
}
public void backgroundJobResultReceived( EslEvent event )
{
log.info( "Background job result received [{}]", event );
}
} );
log.info( "Client connecting .." );
try
{
client.connect( host, port, password, 20 );
}
catch ( InboundConnectionFailure e )
{
log.error( "Connect failed", e );
return;
}
log.info( "Client connected .." );
// client.setEventSubscriptions( "plain", "heartbeat CHANNEL_CREATE CHANNEL_DESTROY BACKGROUND_JOB" );
client.setEventSubscriptions( "plain", "all" );
client.addEventFilter( "Event-Name", "heartbeat" );
client.cancelEventSubscriptions();
client.setEventSubscriptions( "plain", "all" );
client.addEventFilter( "Event-Name", "heartbeat" );
client.addEventFilter( "Event-Name", "channel_create" );
client.addEventFilter( "Event-Name", "background_job" );
client.sendSyncApiCommand( "echo", "Foo foo bar" );
String uuid1=UUID.randomUUID().toString();
String uuid2=UUID.randomUUID().toString();
EslMessage resp=client.sendSyncApiCommand("originate ", "{origination_uuid=" + uuid1 + "}sofia/gateway/mygateway/OUTBOUND+18185551212 1004 park" );
log.info( "Response to 'park1': [{}]", resp );
for ( Entry header : resp.getHeaders().entrySet() )
{
log.info( " * header [{}]", header );
}
for ( String bodyLine : resp.getBodyLines() )
{
log.info( " * body [{}]", bodyLine );
}
SendMsg playMsg = new SendMsg(uuid1);
playMsg.addCallCommand( "execute" );
playMsg.addExecuteAppName( "speak" );
playMsg.addExecuteAppArg("cepstral|callie|welcome to java. Welcome to make call");
CommandResponse cmdresp = client.sendMessage(playMsg);
resp=client.sendSyncApiCommand("originate ", "{origination_uuid=" + uuid2 + "}sofia/gateway/mygateway/OUTBOUND+17325551212 1004 park" );
log.info( "Response to 'park2': [{}]", resp );
for ( Entry header : resp.getHeaders().entrySet() )
{
log.info( " * header [{}]", header );
}
for ( String bodyLine : resp.getBodyLines() )
{
log.info( " * body [{}]", bodyLine );
}
resp=client.sendSyncApiCommand("uuid_bridge ", uuid1 + " " + uuid2 );
log.info( "Response to 'uuid_bridge': [{}]", resp );
for ( Entry header : resp.getHeaders().entrySet() )
{
log.info( " * header [{}]", header );
}
for ( String bodyLine : resp.getBodyLines() )
{
log.info( " * body [{}]", bodyLine );
}
//CommandResponse cmd=CommandResponse( bMsg.toString(), resp);
//client.sendSyncApiCommand("originate", "sofia/gateway/mygateway/OUTBOUND+17325551212 park");
//client.sendSyncApiCommand("bridge", "sofia/gateway/mygateway/OUTBOUND+18185551212 1004 park" );
// client.sendSyncApiCommand( "sofia status", "" );
String jobId = client.sendAsyncApiCommand( "status", "" );
log.info( "Job id [{}] for [status]", jobId );
client.sendSyncApiCommand( "version", "" );
// client.sendAsyncApiCommand( "status", "" );
// client.sendSyncApiCommand( "sofia status", "" );
// client.sendAsyncApiCommand( "status", "" );
EslMessage response = client.sendSyncApiCommand( "sofia status", "" );
log.info( "sofia status = [{}]", response.getBodyLines().get( 3 ) );
// wait to see the heartbeat events arrive
Thread.sleep( 40000 );
client.close();
}
@Test
public void do_multi_connects() throws InterruptedException
{
Client client = new Client();
log.info( "Client connecting .." );
try
{
client.connect( host, port, password, 2 );
}
catch ( InboundConnectionFailure e )
{
log.error( "Connect failed", e );
return;
}
log.info( "Client connected .." );
log.info( "Client connecting .." );
try
{
client.connect( host, port, password, 2 );
}
catch ( InboundConnectionFailure e )
{
log.error( "Connect failed", e );
return;
}
log.info( "Client connected .." );
client.close();
}
@Test
public void sofia_contact()
{
Client client = new Client();
try
{
client.connect( host, port, password, 2 );
}
catch ( InboundConnectionFailure e )
{
log.error( "Connect failed", e );
return;
}
EslMessage response = client.sendSyncApiCommand( "sofia_contact", "internal/102@192.xxx.xxx.xxx" );
log.info( "Response to 'sofia_contact': [{}]", response );
for ( Entry header : response.getHeaders().entrySet() )
{
log.info( " * header [{}]", header );
}
for ( String bodyLine : response.getBodyLines() )
{
log.info( " * body [{}]", bodyLine );
}
client.close();
}
}
Client.java
package net.freeswitch;
/*
* Copyright 2010 david varnes.
*
* Licensed under the Apache License, version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.freeswitch.esl.client.IEslEventListener;
import org.freeswitch.esl.client.inbound.InboundClientHandler;
import org.freeswitch.esl.client.inbound.InboundConnectionFailure;
import org.freeswitch.esl.client.inbound.InboundPipelineFactory;
import org.freeswitch.esl.client.internal.IEslProtocolListener;
import org.freeswitch.esl.client.transport.CommandResponse;
import org.freeswitch.esl.client.transport.SendMsg;
import org.freeswitch.esl.client.transport.event.EslEvent;
import org.freeswitch.esl.client.transport.message.EslMessage;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Entry point to connect to a running FreeSWITCH Event Socket Library module, as a client.
* * This class provides what the FreeSWITCH documentation refers to as an 'Inbound' connection
* to the Event Socket module. That is, with reference to the socket listening on the FreeSWITCH
* server, this client occurs as an inbound connection to the server.
* * See http://wiki.freeswitch.org/wiki/Mod_event_socket
*
* @author david varnes
*/
public class Client
{
private final Logger log = LoggerFactory.getLogger( this.getClass() );
private final List eventListeners = new CopyOnWriteArrayList();
private final Executor eventListenerExecutor = Executors.newSingleThreadExecutor(
new ThreadFactory()
{
AtomicInteger threadNumber = new AtomicInteger( 1 );
public Thread newThread( Runnable r )
{
return new Thread( r, "EslEventNotifier-" + threadNumber.getAndIncrement() );
}
});
private final Executor backgroundJobListenerExecutor = Executors.newSingleThreadExecutor(
new ThreadFactory()
{
AtomicInteger threadNumber = new AtomicInteger( 1 );
public Thread newThread( Runnable r )
{
return new Thread( r, "EslBackgroundJobNotifier-" + threadNumber.getAndIncrement() );
}
});
private AtomicBoolean authenticatorResponded = new AtomicBoolean( false );
private boolean authenticated;
private CommandResponse authenticationResponse;
private Channel channel;
public boolean canSend()
{
return channel != null && channel.isConnected() && authenticated;
}
public void addEventListener( IEslEventListener listener )
{
if ( listener != null )
{
eventListeners.add( listener );
}
}
/**
* Attempt to establish an authenticated connection to the nominated FreeSWITCH ESL server socket.
* This call will block, waiting for an authentication handshake to occur, or timeout after the
* supplied number of seconds.
*
* @param host can be either ip address or hostname
* @param port tcp port that server socket is listening on (set in event_socket_conf.xml)
* @param password server event socket is expecting (set in event_socket_conf.xml)
* @param timeoutSeconds number of seconds to wait for the server socket before aborting
*/
public void connect( String host, int port, String password, int timeoutSeconds ) throws InboundConnectionFailure
{
// If already connected, disconnect first
if ( canSend() )
{
close();
}
// Configure this client
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool() ) );
// Add ESL handler and factory
InboundClientHandler handler = new InboundClientHandler( password, protocolListener );
bootstrap.setPipelineFactory( new InboundPipelineFactory( handler ) );
// Attempt connection
ChannelFuture future = bootstrap.connect( new InetSocketAddress( host, port ) );
// Wait till attempt succeeds, fails or timeouts
if ( ! future.awaitUninterruptibly( timeoutSeconds, TimeUnit.SECONDS ) )
{
throw new InboundConnectionFailure( "Timeout connecting to " + host + ":" + port );
}
// Did not timeout
channel = future.getChannel();
// But may have failed anyway
if ( !future.isSuccess() )
{
log.warn( "Failed to connect to [{}:{}]", host, port );
log.warn( " * reason: {}", future.getCause() );
channel = null;
bootstrap.releaseExternalResources();
throw new InboundConnectionFailure( "Could not connect to " + host + ":" + port, future.getCause() );
}
// Wait for the authentication handshake to call back
while ( ! authenticatorResponded.get() )
{
try
{
Thread.sleep( 250 );
}
catch ( InterruptedException e )
{
// ignore
}
}
if ( ! authenticated )
{
throw new InboundConnectionFailure( "Authentication failed: " + authenticationResponse.getReplyText() );
}
}
/**
* Sends a FreeSWITCH API command to the server and blocks, waiting for an immediate response from the
* server.
* * The outcome of the command from the server is retured in an {@link EslMessage} object.
*
* @param command API command to send
* @param arg command arguments
* @return an {@link EslMessage} containing command results
*/
public EslMessage sendSyncApiCommand( String command, String arg )
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
StringBuilder sb = new StringBuilder();
if ( command != null && !command.isEmpty() )
{
sb.append( "api " );
sb.append( command );
}
if ( arg != null && !arg.isEmpty() )
{
sb.append( arg );
}
return handler.sendSyncSingleLineCommand( channel, sb.toString() );
}
/**
* Submit a FreeSWITCH API command to the server to be executed in background mode. A synchronous
* response from the server provides a UUID to identify the job execution results. When the server
* has completed the job execution it fires a BACKGROUND_JOB Event with the execution results.* Note that this Client must be subscribed in the normal way to BACKGOUND_JOB Events, in order to
* receive this event.
*
* @param command API command to send
* @param arg command arguments
* @return String Job-UUID that the server will tag result event with.
*/
public String sendAsyncApiCommand( String command, String arg )
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
StringBuilder sb = new StringBuilder();
if ( command != null && !command.isEmpty() )
{
sb.append( "bgapi " );
sb.append( command );
}
if ( arg != null && !arg.isEmpty() )
{
sb.append( ' ' );
sb.append( arg );
}
return handler.sendAsyncCommand( channel, sb.toString() );
}
/**
* Set the current event subscription for this connection to the server. Examples of the events
* argument are:
* * ALL
* CHANNEL_CREATE CHANNEL_DESTROY HEARTBEAT
* CUSTOM conference::maintenance
* CHANNEL_CREATE CHANNEL_DESTROY CUSTOM conference::maintenance sofia::register sofia::expire
*
* Subsequent calls to this method replaces any previous subscriptions that were set.
*
* Note: current implementation can only process 'plain' events.
*
* @param format can be { plain | xml }
* @param events { all | space separated list of events }
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse setEventSubscriptions( String format, String events )
{
// temporary hack
if ( ! format.equals( "plain" ) )
{
throw new IllegalStateException( "Only 'plain' event format is supported at present" );
}
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
StringBuilder sb = new StringBuilder();
if ( format != null && !format.isEmpty() )
{
sb.append( "event " );
sb.append( format );
}
if ( events != null && !events.isEmpty() )
{
sb.append( ' ' );
sb.append( events );
}
EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() );
return new CommandResponse( sb.toString(), response );
}
/**
* Cancel any existing event subscription.
*
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse cancelEventSubscriptions()
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
EslMessage response = handler.sendSyncSingleLineCommand( channel, "noevents" );
return new CommandResponse( "noevents", response );
}
/**
* Add an event filter to the current set of event filters on this connection. Any of the event headers
* can be used as a filter.
*
* Note that event filters follow 'filter-in' semantics. That is, when a filter is applied
* only the filtered values will be received. Multiple filters can be added to the current
* connection.
*
* Example filters:
* * eventHeader valueToFilter
* ----------------------------------
* Event-Name CHANNEL_EXECUTE
* Channel-State CS_NEW
*
*
* @param eventHeader to filter on
* @param valueToFilter the value to match
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse addEventFilter( String eventHeader, String valueToFilter )
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
StringBuilder sb = new StringBuilder();
if ( eventHeader != null && !eventHeader.isEmpty() )
{
sb.append( "filter " );
sb.append( eventHeader );
}
if ( valueToFilter != null && !valueToFilter.isEmpty() )
{
sb.append( ' ' );
sb.append( valueToFilter );
}
EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() );
return new CommandResponse( sb.toString(), response );
}
/**
* Delete an event filter from the current set of event filters on this connection. See
* {@link Client.addEventFilter}
*
* @param eventHeader to remove
* @param valueToFilter to remove
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse deleteEventFilter( String eventHeader, String valueToFilter )
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
StringBuilder sb = new StringBuilder();
if ( eventHeader != null && !eventHeader.isEmpty() )
{
sb.append( "filter delete " );
sb.append( eventHeader );
}
if ( valueToFilter != null && !valueToFilter.isEmpty() )
{
sb.append( ' ' );
sb.append( valueToFilter );
}
EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() );
return new CommandResponse( sb.toString(), response );
}
/**
* Send a {@link SendMsg} command to FreeSWITCH. This client requires that the {@link SendMsg}
* has a call UUID parameter.
*
* @param sendMsg a {@link SendMsg} with call UUID
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse sendMessage( SendMsg sendMsg )
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
EslMessage response = handler.sendSyncMultiLineCommand( channel, sendMsg.getMsgLines() );
return new CommandResponse( sendMsg.toString(), response );
}
/**
* Enable log output.
*
* @param level using the same values as in console.conf
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse setLoggingLevel( String level )
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
StringBuilder sb = new StringBuilder();
if ( level != null && !level.isEmpty() )
{
sb.append( "log " );
sb.append( level );
}
EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() );
return new CommandResponse( sb.toString(), response );
}
/**
* Disable any logging previously enabled with setLogLevel().
*
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse cancelLogging()
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
EslMessage response = handler.sendSyncSingleLineCommand( channel, "nolog" );
return new CommandResponse( "nolog", response );
}
/**
* Close the socket connection
*
* @return a {@link CommandResponse} with the server's response.
*/
public CommandResponse close()
{
checkConnected();
InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
EslMessage response = handler.sendSyncSingleLineCommand( channel, "exit" );
return new CommandResponse( "exit", response );
}
/*
* Internal observer of the ESL protocol
*/
private final IEslProtocolListener protocolListener = new IEslProtocolListener()
{
public void authResponseReceived( CommandResponse response )
{
authenticatorResponded.set( true );
authenticated = response.isOk();
authenticationResponse = response;
log.debug( "Auth response success={}, message=[{}]", authenticated, response.getReplyText() );
}
public void eventReceived( final EslEvent event )
{
log.debug( "Event received [{}]", event );
/*
* Notify listeners in a different thread in order to:
* - not to block the IO threads with potentially long-running listeners
* - generally be defensive running other people's code
* Use a different worker thread pool for async job results than for event driven
* events to keep the latency as low as possible.
*/
if ( event.getEventName().equals( "BACKGROUND_JOB" ) )
{
for ( final IEslEventListener listener : eventListeners )
{
backgroundJobListenerExecutor.execute( new Runnable()
{
public void run()
{
try
{
listener.backgroundJobResultReceived( event );
}
catch ( Throwable t )
{
log.error( "Error caught notifying listener of job result [" + event + ']', t );
}
}
} );
}
}
else
{
for ( final IEslEventListener listener : eventListeners )
{
eventListenerExecutor.execute( new Runnable()
{
public void run()
{
try
{
listener.eventReceived( event );
}
catch ( Throwable t )
{
log.error( "Error caught notifying listener of event [" + event + ']', t );
}
}
} );
}
}
}
public void disconnected()
{
log.info( "Disconnected .." );
}
};
private void checkConnected()
{
if ( ! canSend() )
{
throw new IllegalStateException( "Not connected to FreeSWITCH Event Socket" );
}
}
}
No comments:
Post a Comment