View Javadoc

1   /*******************************************************************************
2    *  Copyright (c) 2005, 2006 Imola Informatica.
3    *  All rights reserved. This program and the accompanying materials
4    *  are made available under the terms of the LGPL License v2.1
5    *  which accompanies this distribution, and is available at
6    *  http://www.gnu.org/licenses/lgpl.html
7    *******************************************************************************/
8   package it.imolinfo.jbi4cics.jbi.xfire;
9   
10  import it.imolinfo.jbi4cics.Logger;
11  import it.imolinfo.jbi4cics.LoggerFactory;
12  import it.imolinfo.jbi4cics.jbi.Messages;
13  import java.io.ByteArrayInputStream;
14  import java.io.ByteArrayOutputStream;
15  import java.io.IOException;
16  import java.io.OutputStream;
17  
18  import javax.jbi.messaging.DeliveryChannel;
19  import javax.jbi.messaging.ExchangeStatus;
20  import javax.jbi.messaging.InOut;
21  import javax.jbi.messaging.MessageExchangeFactory;
22  import javax.jbi.messaging.NormalizedMessage;
23  import javax.jbi.servicedesc.ServiceEndpoint;
24  import javax.xml.namespace.QName;
25  import javax.xml.stream.XMLOutputFactory;
26  import javax.xml.stream.XMLStreamException;
27  import javax.xml.stream.XMLStreamWriter;
28  import javax.xml.transform.Source;
29  import javax.xml.transform.stream.StreamSource;
30  
31  import org.apache.servicemix.jbi.jaxp.StAXSourceTransformer;
32  import org.codehaus.xfire.MessageContext;
33  import org.codehaus.xfire.XFireException;
34  import org.codehaus.xfire.exchange.InMessage;
35  import org.codehaus.xfire.exchange.MessageSerializer;
36  import org.codehaus.xfire.exchange.OutMessage;
37  import org.codehaus.xfire.fault.XFireFault;
38  import org.codehaus.xfire.soap.AbstractSoapBinding;
39  import org.codehaus.xfire.transport.AbstractChannel;
40  import org.codehaus.xfire.transport.Channel;
41  
42  /**
43   * Jbi channel, only support local invocations. 
44   */
45  public class JbiChannel extends AbstractChannel {
46  
47      public static final String JBI_INTERFACE_NAME = "jbi.interface";
48      public static final String JBI_SERVICE_NAME = "jbi.service";
49      public static final String JBI_ENDPOINT = "jbi.endpoint";
50      
51      /**
52       * The logger for this class and its instances.
53       */
54      private static final Logger LOG
55              = LoggerFactory.getLogger(JbiChannel.class);    
56      /**
57       * The responsible to translate localized messages.
58       */
59      private static final Messages MESSAGES
60              = Messages.getMessages(JbiChannel.class);   
61      
62      private StAXSourceTransformer sourceTransformer;
63      private XMLOutputFactory outputFactory;
64   
65      
66      public JbiChannel(String uri, JbiTransport transport) {
67          setTransport(transport);
68          setUri(uri);
69          this.sourceTransformer = new StAXSourceTransformer();
70          this.outputFactory = XMLOutputFactory.newInstance();
71      }
72  
73      public void open() throws Exception {
74      }
75  
76      public void send(MessageContext context, OutMessage message) throws XFireException {
77          if (Channel.BACKCHANNEL_URI.equals(message.getUri())) {
78              final OutputStream out = (OutputStream) context.getProperty(Channel.BACKCHANNEL_URI);
79              if (out != null) {
80                  try {
81                      final XMLStreamWriter writer = outputFactory.createXMLStreamWriter(out, message.getEncoding());
82                      message.getSerializer().writeMessage(message, writer, context);
83                      writer.close();
84                  } catch (XMLStreamException e) {
85                      LOG.error("CIC001400_Error_closing_output_stream", new Object[] {e.getMessage()}, e);
86                      throw new XFireException(MESSAGES.getString("CIC001400_Error_closing_output_stream", e.getMessage()), e);
87                  }
88                  return;
89              }
90          } else {
91              try {
92                  DeliveryChannel channel = ((JbiTransport) getTransport()).getContext().getDeliveryChannel();
93                  MessageExchangeFactory factory = channel.createExchangeFactory();
94                  if (context.getExchange().hasOutMessage()) {
95                      InOut me = factory.createInOutExchange();
96                      me.setInterfaceName((QName) context.getService().getProperty(JBI_INTERFACE_NAME));
97                      me.setService((QName) context.getService().getProperty(JBI_SERVICE_NAME));
98                      me.setEndpoint((ServiceEndpoint) context.getService().getProperty(JBI_ENDPOINT));
99                      NormalizedMessage msg = me.createMessage();
100                     me.setInMessage(msg);
101                     msg.setContent(getContent(context, message));
102                     if (!channel.sendSync(me)) {
103                         throw new XFireException(MESSAGES.getString("CIC001401_Error_sending_jbi_exchange"));
104                     }
105                     if (me.getStatus() == ExchangeStatus.ERROR) {
106                         me.setStatus(ExchangeStatus.DONE);
107                         channel.send(me);
108                         if (me.getError() != null) {
109                             throw new XFireFault(me.getError(), XFireFault.RECEIVER);
110                         } else if (me.getFault() != null){
111                             // TODO: retrieve fault
112                             throw new XFireFault(MESSAGES.getString("CIC001402_Fault_received"), XFireFault.RECEIVER);
113                         } else {
114                             throw new XFireFault(MESSAGES.getString("CIC001403_Unknown_error"), XFireFault.RECEIVER);
115                         }
116                     }
117                     Source outSrc = me.getOutMessage().getContent();
118                     me.setStatus(ExchangeStatus.DONE);
119                     channel.send(me);
120 
121                     InMessage inMessage = new InMessage(sourceTransformer.toXMLStreamReader(outSrc), getUri());
122                     getEndpoint().onReceive(context, inMessage);
123                 } else {
124                     // TODO
125                 }
126                 
127                 
128             } catch (Exception e) {
129                 LOG.error("CIC001404_Error_sending_exchange", 
130                         e.getMessage(), e);
131                 throw new XFireException(MESSAGES.getString(
132                         "CIC001404_Error_sending_exchange", e.getMessage()), e);
133             }
134         }
135     }
136 
137     protected Source getContent(MessageContext context, OutMessage message) throws XMLStreamException, IOException, XFireException {
138         ByteArrayOutputStream outStream = new ByteArrayOutputStream();
139         XMLStreamWriter writer = outputFactory.createXMLStreamWriter(outStream, message.getEncoding());
140         MessageSerializer serializer = context.getOutMessage().getSerializer();
141         if (serializer == null)
142         {
143         	AbstractSoapBinding binding = (AbstractSoapBinding) context.getBinding();
144             if (binding == null)
145             {
146                 throw new XFireException(MESSAGES.getString("CIC001405_Binding_not_found"));
147             }
148             serializer = AbstractSoapBinding.getSerializer(binding.getStyle(), binding.getUse());
149         }
150         serializer.writeMessage(message, writer, context);
151         writer.close();
152         outStream.close();
153         StreamSource src = new StreamSource(new ByteArrayInputStream(outStream.toByteArray()));
154         return src;
155     }
156     
157 }