Monday, January 19, 2015

Java ESL Freeswitch to bridge 2 calls


Jars required:
  • hamcrest-all-1.1.jar
  • junit.jar
  • netty-3.6.3.Final.jar
  • org.freeswitch.esl.client-0.9.3-SNAPSHOT.jar
package net.freeswitch;
/*
 * Copyright 2010 david varnes.
 *
 * Licensed under the Apache License, version 2.0 (the "License"); 
 * you may not use this file except in compliance with the License. 
 * You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import java.util.Map.Entry;
import java.util.UUID;

import org.freeswitch.esl.client.IEslEventListener;
import org.freeswitch.esl.client.inbound.InboundConnectionFailure;
import org.freeswitch.esl.client.transport.CommandResponse;
import org.freeswitch.esl.client.transport.SendMsg;
import org.freeswitch.esl.client.transport.event.EslEvent;
import org.freeswitch.esl.client.transport.message.EslHeaders.Name;
import org.freeswitch.esl.client.transport.message.EslMessage;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientTest
{
    private final Logger log = LoggerFactory.getLogger( this.getClass() );

    private String host = "127.0.0.1";
    private int port = 8021;
    private String password = "ClueCon"; 
        
    @Test
    public void do_connect() throws InterruptedException
    {
        Client client = new Client();
     
        client.addEventListener( new IEslEventListener()
        {
            public void eventReceived( EslEvent event )
            {
                log.info( "Event received [{}]", event );
            }
            public void backgroundJobResultReceived( EslEvent event )
            {
                log.info( "Background job result received [{}]", event );
            }
            
        } );
        
        log.info( "Client connecting .." );
        try
        {
            client.connect( host, port, password, 20 );
        }
        catch ( InboundConnectionFailure e )
        {
            log.error( "Connect failed", e );
            return;
        }
        log.info( "Client connected .." );
        
//      client.setEventSubscriptions( "plain", "heartbeat CHANNEL_CREATE CHANNEL_DESTROY BACKGROUND_JOB" );
        client.setEventSubscriptions( "plain", "all" );
        client.addEventFilter( "Event-Name", "heartbeat" );
        client.cancelEventSubscriptions();
        client.setEventSubscriptions( "plain", "all" );
        client.addEventFilter( "Event-Name", "heartbeat" );
        client.addEventFilter( "Event-Name", "channel_create" );
        client.addEventFilter( "Event-Name", "background_job" );
        client.sendSyncApiCommand( "echo", "Foo foo bar" );
     String uuid1=UUID.randomUUID().toString();
     String uuid2=UUID.randomUUID().toString();
     EslMessage resp=client.sendSyncApiCommand("originate ", "{origination_uuid=" + uuid1 + "}sofia/gateway/mygateway/OUTBOUND+18185551212 1004 park" );
     log.info( "Response to 'park1': [{}]", resp );
        for ( Entry header : resp.getHeaders().entrySet() )
        {
            log.info( " * header [{}]", header );
        }
        for ( String bodyLine : resp.getBodyLines() )
        {
            log.info( " * body [{}]", bodyLine );
        }
        
        SendMsg playMsg = new SendMsg(uuid1);
        playMsg.addCallCommand( "execute" );
        playMsg.addExecuteAppName( "speak" );
        playMsg.addExecuteAppArg("cepstral|callie|welcome to java. Welcome to make call");
        CommandResponse cmdresp = client.sendMessage(playMsg);
        
        resp=client.sendSyncApiCommand("originate ", "{origination_uuid=" + uuid2 + "}sofia/gateway/mygateway/OUTBOUND+17325551212 1004 park" );
     log.info( "Response to 'park2': [{}]", resp );
        for ( Entry header : resp.getHeaders().entrySet() )
        {
            log.info( " * header [{}]", header );
        }
        for ( String bodyLine : resp.getBodyLines() )
        {
            log.info( " * body [{}]", bodyLine );
        }
        resp=client.sendSyncApiCommand("uuid_bridge ", uuid1 + " " + uuid2 );
     log.info( "Response to 'uuid_bridge': [{}]", resp );
        for ( Entry header : resp.getHeaders().entrySet() )
        {
            log.info( " * header [{}]", header );
        }
        for ( String bodyLine : resp.getBodyLines() )
        {
            log.info( " * body [{}]", bodyLine );
        }
     //CommandResponse cmd=CommandResponse( bMsg.toString(), resp);
        //client.sendSyncApiCommand("originate", "sofia/gateway/mygateway/OUTBOUND+17325551212 park");
        //client.sendSyncApiCommand("bridge", "sofia/gateway/mygateway/OUTBOUND+18185551212 1004 park" );
//        client.sendSyncApiCommand( "sofia status", "" );
        String jobId = client.sendAsyncApiCommand( "status", "" );
        log.info( "Job id [{}] for [status]", jobId );
        client.sendSyncApiCommand( "version", "" );
//        client.sendAsyncApiCommand( "status", "" );
//        client.sendSyncApiCommand( "sofia status", "" );
//        client.sendAsyncApiCommand( "status", "" );
        EslMessage response = client.sendSyncApiCommand( "sofia status", "" );
        log.info( "sofia status = [{}]", response.getBodyLines().get( 3 ) );
        
        // wait to see the heartbeat events arrive
        Thread.sleep( 40000 );
        client.close();
    }

    @Test
    public void do_multi_connects() throws InterruptedException
    {
        Client client = new Client();
        
        log.info( "Client connecting .." );
        try
        {
            client.connect( host, port, password, 2 );
        }
        catch ( InboundConnectionFailure e )
        {
            log.error( "Connect failed", e );
            return;
        }
        log.info( "Client connected .." );
        
        log.info( "Client connecting .." );
        try
        {
            client.connect( host, port, password, 2 );
        }
        catch ( InboundConnectionFailure e )
        {
            log.error( "Connect failed", e );
            return;
        }
        log.info( "Client connected .." );
        
        client.close();
    }
    
    @Test
    public void sofia_contact()
    {
        Client client = new Client();
        try
        {
            client.connect( host, port, password, 2 );
        }
        catch ( InboundConnectionFailure e )
        {
            log.error( "Connect failed", e );
            return;
        }
        
        EslMessage response = client.sendSyncApiCommand( "sofia_contact", "internal/102@192.xxx.xxx.xxx" );

        log.info( "Response to 'sofia_contact': [{}]", response );
        for ( Entry header : response.getHeaders().entrySet() )
        {
            log.info( " * header [{}]", header );
        }
        for ( String bodyLine : response.getBodyLines() )
        {
            log.info( " * body [{}]", bodyLine );
        }
        client.close();
    }
}
Client.java
package net.freeswitch;

/*
 * Copyright 2010 david varnes.
 *
 * Licensed under the Apache License, version 2.0 (the "License"); 
 * you may not use this file except in compliance with the License. 
 * You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.freeswitch.esl.client.IEslEventListener;
import org.freeswitch.esl.client.inbound.InboundClientHandler;
import org.freeswitch.esl.client.inbound.InboundConnectionFailure;
import org.freeswitch.esl.client.inbound.InboundPipelineFactory;
import org.freeswitch.esl.client.internal.IEslProtocolListener;
import org.freeswitch.esl.client.transport.CommandResponse;
import org.freeswitch.esl.client.transport.SendMsg;
import org.freeswitch.esl.client.transport.event.EslEvent;
import org.freeswitch.esl.client.transport.message.EslMessage;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Entry point to connect to a running FreeSWITCH Event Socket Library module, as a client.
 * * This class provides what the FreeSWITCH documentation refers to as an 'Inbound' connection
 * to the Event Socket module. That is, with reference to the socket listening on the FreeSWITCH
 * server, this client occurs as an inbound connection to the server.
 * * See http://wiki.freeswitch.org/wiki/Mod_event_socket
 * 
 * @author  david varnes
 */
public class Client
{
    private final Logger log = LoggerFactory.getLogger( this.getClass() );
    
    private final List eventListeners = new CopyOnWriteArrayList();
    private final Executor eventListenerExecutor = Executors.newSingleThreadExecutor( 
        new ThreadFactory()
        {
            AtomicInteger threadNumber = new AtomicInteger( 1 );
            public Thread newThread( Runnable r )
            {
                return new Thread( r, "EslEventNotifier-" + threadNumber.getAndIncrement() );
            }
        });
    private final Executor backgroundJobListenerExecutor = Executors.newSingleThreadExecutor(
        new ThreadFactory()
        {
            AtomicInteger threadNumber = new AtomicInteger( 1 );
            public Thread newThread( Runnable r )
            {
                return new Thread( r, "EslBackgroundJobNotifier-" + threadNumber.getAndIncrement() );
            }
        });
    
    private AtomicBoolean authenticatorResponded = new AtomicBoolean( false );
    private boolean authenticated;
    private CommandResponse authenticationResponse;
    private Channel channel;
    
    public boolean canSend()
    {
        return channel != null && channel.isConnected() && authenticated; 
    }
    
    public void addEventListener( IEslEventListener listener )
    {
        if ( listener != null )
        {
            eventListeners.add( listener );
        }
    }

    /**
     * Attempt to establish an authenticated connection to the nominated FreeSWITCH ESL server socket.
     * This call will block, waiting for an authentication handshake to occur, or timeout after the
     * supplied number of seconds.  
     *  
     * @param host can be either ip address or hostname
     * @param port tcp port that server socket is listening on (set in event_socket_conf.xml)
     * @param password server event socket is expecting (set in event_socket_conf.xml) 
     * @param timeoutSeconds number of seconds to wait for the server socket before aborting
     */
    public void connect( String host, int port, String password, int timeoutSeconds ) throws InboundConnectionFailure
    {
        // If already connected, disconnect first
        if ( canSend() )
        {
            close();
        }
        
        // Configure this client
        ClientBootstrap bootstrap = new ClientBootstrap(
            new NioClientSocketChannelFactory( 
                Executors.newCachedThreadPool(), 
                Executors.newCachedThreadPool() ) ); 
        
        // Add ESL handler and factory
        InboundClientHandler handler = new InboundClientHandler( password, protocolListener );
        bootstrap.setPipelineFactory( new InboundPipelineFactory( handler ) );
        
        // Attempt connection
        ChannelFuture future = bootstrap.connect( new InetSocketAddress( host, port ) );
        
        // Wait till attempt succeeds, fails or timeouts
        if ( ! future.awaitUninterruptibly( timeoutSeconds, TimeUnit.SECONDS ) )
        {
            throw new InboundConnectionFailure( "Timeout connecting to " + host + ":" + port );
        }
        // Did not timeout 
        channel = future.getChannel();
        // But may have failed anyway
        if ( !future.isSuccess() )
        {
            log.warn( "Failed to connect to [{}:{}]", host, port );
            log.warn( "  * reason: {}", future.getCause() );
            
            channel = null;
            bootstrap.releaseExternalResources();
            
            throw new InboundConnectionFailure( "Could not connect to " + host + ":" + port, future.getCause() );
        }
        
        //  Wait for the authentication handshake to call back
        while ( ! authenticatorResponded.get() )
        {
            try
            {
                Thread.sleep( 250 );
            } 
            catch ( InterruptedException e )
            {
                // ignore
            }
        }
        
        if ( ! authenticated )
        {
            throw new InboundConnectionFailure( "Authentication failed: " + authenticationResponse.getReplyText() );
        }
    }
    
    /**
     * Sends a FreeSWITCH API command to the server and blocks, waiting for an immediate response from the 
     * server.
     * * The outcome of the command from the server is retured in an {@link EslMessage} object.
     * 
     * @param command API command to send
     * @param arg command arguments
     * @return an {@link EslMessage} containing command results
     */
    public EslMessage sendSyncApiCommand( String command, String arg )
    {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if ( command != null && !command.isEmpty() )
        {
            sb.append( "api " );
            sb.append( command );
        }
        if ( arg != null && !arg.isEmpty() )
        {
            sb.append( arg );
        }

        return handler.sendSyncSingleLineCommand( channel, sb.toString() );
    }
    
    /**
     * Submit a FreeSWITCH API command to the server to be executed in background mode. A synchronous 
     * response from the server provides a UUID to identify the job execution results. When the server
     * has completed the job execution it fires a BACKGROUND_JOB Event with the execution results.* Note that this Client must be subscribed in the normal way to BACKGOUND_JOB Events, in order to 
     * receive this event.
     *     
     * @param command API command to send
     * @param arg command arguments
     * @return String Job-UUID that the server will tag result event with.
     */
    public String sendAsyncApiCommand( String command, String arg )
    {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if ( command != null && !command.isEmpty() )
        {
            sb.append( "bgapi " );
            sb.append( command );
        }
        if ( arg != null && !arg.isEmpty() )
        {
            sb.append( ' ' );
            sb.append( arg );
        }
        
        return handler.sendAsyncCommand( channel, sb.toString() );
    }
    
    /**
     * Set the current event subscription for this connection to the server.  Examples of the events 
     * argument are:
     * 
     *   ALL
     *   CHANNEL_CREATE CHANNEL_DESTROY HEARTBEAT
     *   CUSTOM conference::maintenance
     *   CHANNEL_CREATE CHANNEL_DESTROY CUSTOM conference::maintenance sofia::register sofia::expire
     * 
* Subsequent calls to this method replaces any previous subscriptions that were set. * * Note: current implementation can only process 'plain' events. * * @param format can be { plain | xml } * @param events { all | space separated list of events } * @return a {@link CommandResponse} with the server's response. */ public CommandResponse setEventSubscriptions( String format, String events ) { // temporary hack if ( ! format.equals( "plain" ) ) { throw new IllegalStateException( "Only 'plain' event format is supported at present" ); } checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); StringBuilder sb = new StringBuilder(); if ( format != null && !format.isEmpty() ) { sb.append( "event " ); sb.append( format ); } if ( events != null && !events.isEmpty() ) { sb.append( ' ' ); sb.append( events ); } EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() ); return new CommandResponse( sb.toString(), response ); } /** * Cancel any existing event subscription. * * @return a {@link CommandResponse} with the server's response. */ public CommandResponse cancelEventSubscriptions() { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); EslMessage response = handler.sendSyncSingleLineCommand( channel, "noevents" ); return new CommandResponse( "noevents", response ); } /** * Add an event filter to the current set of event filters on this connection. Any of the event headers * can be used as a filter. * * Note that event filters follow 'filter-in' semantics. That is, when a filter is applied * only the filtered values will be received. Multiple filters can be added to the current * connection. * * Example filters: *
     *    eventHeader        valueToFilter
     *    ----------------------------------
     *    Event-Name         CHANNEL_EXECUTE
     *    Channel-State      CS_NEW
     * 
* * @param eventHeader to filter on * @param valueToFilter the value to match * @return a {@link CommandResponse} with the server's response. */ public CommandResponse addEventFilter( String eventHeader, String valueToFilter ) { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); StringBuilder sb = new StringBuilder(); if ( eventHeader != null && !eventHeader.isEmpty() ) { sb.append( "filter " ); sb.append( eventHeader ); } if ( valueToFilter != null && !valueToFilter.isEmpty() ) { sb.append( ' ' ); sb.append( valueToFilter ); } EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() ); return new CommandResponse( sb.toString(), response ); } /** * Delete an event filter from the current set of event filters on this connection. See * {@link Client.addEventFilter} * * @param eventHeader to remove * @param valueToFilter to remove * @return a {@link CommandResponse} with the server's response. */ public CommandResponse deleteEventFilter( String eventHeader, String valueToFilter ) { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); StringBuilder sb = new StringBuilder(); if ( eventHeader != null && !eventHeader.isEmpty() ) { sb.append( "filter delete " ); sb.append( eventHeader ); } if ( valueToFilter != null && !valueToFilter.isEmpty() ) { sb.append( ' ' ); sb.append( valueToFilter ); } EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() ); return new CommandResponse( sb.toString(), response ); } /** * Send a {@link SendMsg} command to FreeSWITCH. This client requires that the {@link SendMsg} * has a call UUID parameter. * * @param sendMsg a {@link SendMsg} with call UUID * @return a {@link CommandResponse} with the server's response. */ public CommandResponse sendMessage( SendMsg sendMsg ) { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); EslMessage response = handler.sendSyncMultiLineCommand( channel, sendMsg.getMsgLines() ); return new CommandResponse( sendMsg.toString(), response ); } /** * Enable log output. * * @param level using the same values as in console.conf * @return a {@link CommandResponse} with the server's response. */ public CommandResponse setLoggingLevel( String level ) { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); StringBuilder sb = new StringBuilder(); if ( level != null && !level.isEmpty() ) { sb.append( "log " ); sb.append( level ); } EslMessage response = handler.sendSyncSingleLineCommand( channel, sb.toString() ); return new CommandResponse( sb.toString(), response ); } /** * Disable any logging previously enabled with setLogLevel(). * * @return a {@link CommandResponse} with the server's response. */ public CommandResponse cancelLogging() { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); EslMessage response = handler.sendSyncSingleLineCommand( channel, "nolog" ); return new CommandResponse( "nolog", response ); } /** * Close the socket connection * * @return a {@link CommandResponse} with the server's response. */ public CommandResponse close() { checkConnected(); InboundClientHandler handler = (InboundClientHandler)channel.getPipeline().getLast(); EslMessage response = handler.sendSyncSingleLineCommand( channel, "exit" ); return new CommandResponse( "exit", response ); } /* * Internal observer of the ESL protocol */ private final IEslProtocolListener protocolListener = new IEslProtocolListener() { public void authResponseReceived( CommandResponse response ) { authenticatorResponded.set( true ); authenticated = response.isOk(); authenticationResponse = response; log.debug( "Auth response success={}, message=[{}]", authenticated, response.getReplyText() ); } public void eventReceived( final EslEvent event ) { log.debug( "Event received [{}]", event ); /* * Notify listeners in a different thread in order to: * - not to block the IO threads with potentially long-running listeners * - generally be defensive running other people's code * Use a different worker thread pool for async job results than for event driven * events to keep the latency as low as possible. */ if ( event.getEventName().equals( "BACKGROUND_JOB" ) ) { for ( final IEslEventListener listener : eventListeners ) { backgroundJobListenerExecutor.execute( new Runnable() { public void run() { try { listener.backgroundJobResultReceived( event ); } catch ( Throwable t ) { log.error( "Error caught notifying listener of job result [" + event + ']', t ); } } } ); } } else { for ( final IEslEventListener listener : eventListeners ) { eventListenerExecutor.execute( new Runnable() { public void run() { try { listener.eventReceived( event ); } catch ( Throwable t ) { log.error( "Error caught notifying listener of event [" + event + ']', t ); } } } ); } } } public void disconnected() { log.info( "Disconnected .." ); } }; private void checkConnected() { if ( ! canSend() ) { throw new IllegalStateException( "Not connected to FreeSWITCH Event Socket" ); } } }

No comments:

Post a Comment