Sunday, 27 October 2013

Publish/Subscribe to Weblogic JMS Queue/Topic messages using Oracle Data Integrator


JMS Queue/Topics are predominantly used in OLTP systems for asynchronous message processing, persistence and its robust support for competing consumers (Queue) and subscribers (Topics) still maintaining the XA capabilities.  Even if Oracle Data Integrator is a batch loading tool predominantly used in OLAP systems (exceptions are Oracle Apps/Fusion Apps), sometimes there comes a requirement to subscribe to Topics data to store in database for further processing in OLAP systems. ODI provides support for JMS Queue/Topic for text/XML payloads. In this blog, I am going write about the steps for publishing/subscribing to weblogic JMS Queue/Topic messages using ODI. Lets Get Started.

First I am going to show the Admin part of weblogic on how to create queue/topics just for those who are new to admin activities so that they can create queues/topics. Later I will show the steps of publishing/subscribing messages in ODI.

Create Queue/Topic in Weblogic

Create a new JMS server in weblogic targeted to Admin server here. I haven't specified any persistent store here. You can configure file/database based persistent store for storing messages.
Create a JMS module targeted to Admin server.
Create a subdeployment under the jms module 'odimodule' we created earlier. I have targeted this to jmsserver we created earlier.
Create queue , topic and connection factory resources under 'odimodule' and selected the sub-deployment we created earlier. They are automatically targeted to jmsserver1 through the subdeployment we selected. Connection Factory is needed to make connection to weblogic to get the resources like queue,topic from a client outside weblogic.

Now we have completed the admin activities to start the work in ODI. Lets test it using java client first before we configure odi. Please add the libarary Wlthint3client.jar to your java project.
package JMSClientApp;


import java.util.Hashtable;

import javax.naming.*;

import javax.jms.*;

public class JMSTest {
    private static InitialContext ctx = null;
    private static QueueConnectionFactory qcf = null;
    private static QueueConnection qc = null;
    private static QueueSession qsess = null;
    private static Queue q = null;
    private static QueueSender qsndr = null;
    private static TextMessage message = null;
    // NOTE: The next two lines set the name of the Queue Connection Factory
    //       and the Queue that we want to use.
    private static final String QCF_NAME = "jms/odicf";
    private static final String QUEUE_NAME = "jms/odiqueue";

    public JMSTest() {
        super();
    }

    public static void sendMessage(String messageText) {
        // create InitialContext
        Hashtable properties = new Hashtable();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
        // NOTE: The port number of the server is provided in the next line,
        //       followed by the userid and password on the next two lines.
        properties.put(Context.PROVIDER_URL, "t3://localhost:7001");
        properties.put(Context.SECURITY_PRINCIPAL, "weblogic");
        properties.put(Context.SECURITY_CREDENTIALS, "welcome123");
        try {
            ctx = new InitialContext(properties);
        } catch (NamingException ne) {
            ne.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got InitialContext " + ctx.toString());
        // create QueueConnectionFactory
        try {
            qcf = (QueueConnectionFactory) ctx.lookup(QCF_NAME);
        } catch (NamingException ne) {
            ne.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueConnectionFactory " + qcf.toString());
        // create QueueConnection
        try {
            qc = qcf.createQueueConnection();
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueConnection " + qc.toString());
        // create QueueSession
        try {
            qsess = qc.createQueueSession(false, 0);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueSession " + qsess.toString());
        // lookup Queue
        try {
            q = (Queue) ctx.lookup(QUEUE_NAME);
        } catch (NamingException ne) {
            ne.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got Queue " + q.toString());
        // create QueueSender
        try {
            qsndr = qsess.createSender(q);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueSender " + qsndr.toString());
        // create TextMessage
        try {
            message = qsess.createTextMessage();
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got TextMessage " + message.toString());
        // set message text in TextMessage
        try {
            message.setText(messageText);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Set text in TextMessage " + message.toString());
        // send message
        try {
            qsndr.send(message);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Sent message ");
        // clean up
        try {
            message = null;
            qsndr.close();
            qsndr = null;
            q = null;
            qsess.close();
            qsess = null;
            qc.close();
            qc = null;
            qcf = null;
            ctx = null;
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
        }
        System.out.println("Cleaned up and done.");
    }

    public static void main(String args[]) {
        sendMessage("8,test3,24");
        sendMessage("9,test4,24");
    }
}

The java client test is successful and message published successfully to JMS Queue.

Configuring JMS Queue in ODI
ODI provides four technologies in topology for JMS configuration. They JMS Queue,JMS Queue XML, JMS Topic and JMS Topic XML. Since JMS needs to be fit into ODI architecture (of handling text, xml in different ways), they are designed this way. Basically JMS Queue and JMS Topic handle text, csv, fixed length payloads to put them into relational tables. JMS Queue XML and JMS Topic XML handle xml payload to put them into relational tables. Here I am going to show text payloads using csv format. XML needs more configuration which i will show in another blog or extend this blog.

Let me take JMS Queue for illustration here. It involves setting up JNDI configuration of connection factory for weblogic connection in ODI JMS Queue technology in topology. Next we need to create a data model. In the data model we need to create a data store to represent the queue. Here are the steps

Create a physical server in toplology for JMS Queue technology and provide the JNDI connection information for the connection factory. ODI makes the connection through connection factory to get the resources. Click on test connection and it should succeed.

create a logical server assign the physical server for the context.
Create the Model by selecting the JMS Queue technology and logical server we created before.
For JMS technology reverse engineering is not supported and we need to manually create the data store for the jms queue. Make sure that jndi name is specified correctly for the queue in resource name field.
In the file section configure the jms payload format. Here I saying that payload is a comma separated text.

configure the payload columns in the columns section of datastore. I am configuring here for three columns for id, name and age with id as key. So comma separated text received in payload is expected to have three fields.

With this we have completed the ODI configuration for the JMS Queue. Next up we need to create the interfaces for publishing message to JMS and subscribing the message from JMS.

Create Interfaces for Publishing/Subscribing JMS Messages
Following are the knowledge modules we would need to loading, integration. Please import them to the odi project. IKM SQL to JMS and LKM JMS to SQL are the jms specific KMs for loading and integration.

INT-TABLE_TO_JMS - this interface uses IKM SQL to JMS Append for integration to JMS.
INT-JMS_TO_TABLE - This interface uses LKM JMS to SQL, IKM Oracle Incremental update and CKM Oracle for loading, integration and check constraints.

INT-TABLE_TO_JMS interface for publishing message
Create the INT-TABLE_TO_JMS interface with staging different from target option selected (select your source logical server as staging area), drop the table in source (I have created a table in HR schema with three columns and two rows of data) and the jms datastore to target in the mapping area.

mapping is done here.
ODI automatically sets the IKM with SQL to JMS Append here since we have the KM in the project. Please also note the publish commit option here. This option can be set to false here and in the odi package we can do further processing and issue a commit later using a seperate command in the package. This is where XA transaction handled in ODI between the database and JMS resources.
Lets run this interface and check whether it is published to JMS from table. I have two records in the table.
INSERT INTO "HR"."JMS_INPUT" (ID, NAME, AGE) VALUES ('1', 'test1', '20');
INSERT INTO "HR"."JMS_INPUT" (ID, NAME, AGE) VALUES ('2', 'test2', '21');
Interface run is successful and I could see messages in the jms.

INT-JMS_TO_TABLE interface for subscribing messages
Create the interface with jms datastore as source and table as target in the mapping. here staging area is part of target area.

In the flow area LKM JMS to SQL is used to load data. Also not that commit can be made false here and when this interface is part of the package, commit can issues separately in a procedure as part of this package if need to do combine data processing and jms message consumption in a single transaction.
Run the interface and see whether it populates the output table
Interface run is successful and output table is populated from the from the messages.
There are no message is the queue and the table is populated.
In a similar way, topic messages can be consumed and we can publish messages to topic. In the only difference is that topic keeps the messages until all consumers subscribe to the message till it expires.

Friday, 25 October 2013

Access unread emails and save attachments from MS Exchange WebServices using Java Client


I had business scenario recently to read new emails from Microsoft Exchange, extract the excel attachments and load them to Oracle Data Integrator. Loading the excel to oracle integrator, I will show as part of my next blog. In this blog, I just wanted to post my experience in accessing EWS using java.

If Exchange Admin has enabled web services, it is available for web access through the url <<https://[exchange web mail url]/EWS/Exchange.asmx>>. This same url when access through browser will redirect to <<https://[exchange web mail url]/EWS/Services.wsdl>> url and will show WSDL for accessing the web service. Save the wsdl file to the local folder. This wsdl refer to couple of XSDs internally  which also need to be downloaded to the same folder where WSDL is saved. The url for the XSDs will be like
<<https://[exchange web mail url]/EWS/messages.xsd>>
<<https://[exchange web mail url]/EWS/types.xsd>>

The WSDL provided by EWS is not a concrete WSDL and will be missing the service information. We need to add the service information manually to the file. Add below service information to WSDL between definitions and binding. Make sure that you provide the web mail url in the location attribute.


When we created the jave client from here will cause xml:lang exception due namespace declaration issue. Replace the <xs:import namespace="http://www.w3.org/XML/1998/namespace"> the line in types.xsd with the line <xs:import namespace="http://www.w3.org/XML/1998/namespace" schemaLocation="http://www.w3.org/2001/xml.xsd"/> to avoid this issue. Lets go ahead and create the client.

Create the Java client for the web service using the WSDL saved in the local folder. I am using the Jdeveloper in this example to create the java client. Create a Java project and create a webservice client and proxy from the BusinessTier->Web Services category.


If you need to generate asynchronous method, you can choose to do so in the wizard otherwise select not to generate asynchronous method. I have have selected not to generate asynchronous methods in the wizard. we dont need to select OWSM policies here as we will handle the security part in the java client itself. Complete the wizard. It will take good amount of time to generate Java objects using JAX-WS and JAXB from the WSDL and XSD. The generated java stubs look like the below.



Next, we just need to write java class to build the service object, construct request object, invoke the service operation by passing constructed request object. The service returns response object which needs to be accessed for the result.

There are three web service operations mentioned below which I used for my business case to find unread emails, get specific email details and save attachments. The web service operations with their request and response information is available in the Microsoft site http://msdn.microsoft.com/enus/library/exchange/bb409286(v=exchg.150).aspx.

WS Operations:
1. findItem (retrive all unread emails with ItemId)
2. getItem (retrieve details for specific email by passing ItemId. Fetches all attachments with AttachmentId)
3. getAttachment (Get the attachment with content by passing AttachmentId)

Creating Service with authentication
Following code does creates the service object with authentication details. Please change the user.domain, password details here.



package testmailprj;



import com.microsoft.schemas.exchange.services._2006.messages.ExchangeServicePortType;

import com.microsoft.schemas.exchange.services._2006.messages.ExchangeServices;

import java.net.MalformedURLException;

import java.net.URL;

import java.util.logging.Level;

import java.util.logging.Logger;

import java.net.*;

import javax.xml.ws.BindingProvider;





public class EWSTest {



    public static void main(String[] args) {



        try {

            Authenticator.setDefault(new RetrieveWSDLAuthenticator("domain\\username", "password"));

            URL wsdlURL = new URL("https://<exchange web url>/ews/Services.wsdl");

            ExchangeServicePortType port = getExchangeServicePort("email id", "password",

                                                             "domain", wsdlURL);

            ItemAttachments itemAttachments = new ItemAttachments();

            itemAttachments.getAttachments(port);

        } catch (MalformedURLException ex) {

            Logger.getLogger(EWSTest.class.getName()).log(Level.SEVERE, null, ex);

        }

    }



    static class RetrieveWSDLAuthenticator extends Authenticator {

        private String username, password;



        public RetrieveWSDLAuthenticator(String user, String pass) {

            username = user;

            password = pass;

        }



        @Override

        protected PasswordAuthentication getPasswordAuthentication() {

            return new PasswordAuthentication(username, password.toCharArray());

        }

    }

    public static ExchangeServicePortType getExchangeServicePort(String username, String password, String domain, URL wsdlURL) throws MalformedURLException {

        String uid = domain + "\\" + username;

  

        ExchangeServices exchangeWebService = new ExchangeServices();

        ExchangeServicePortType port = exchangeWebService.getExchangeServicePort();

        ((BindingProvider)port).getRequestContext().put(BindingProvider.USERNAME_PROPERTY, uid);

        ((BindingProvider)port).getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, password);

        

        return port;

    }



}


The ItemAttachments class has three methods for invoking the findItem, getItem, getAttachment operations. The methods are explained below.

FindItems
EWS represents calender, email, task as items in the web service. Here we are trying to retrieve unread emails by constructing the Request object with restriction (for finding unread messages). Once the response is received, it is looped for each messages which is returned in the response. findItems operation returns only limited information for the message and if we need more information on the message, it is available through the getItem operation. Especially, findItem doesnt return attachment information but getItem return attachment information without the actual attachment content.

getItem
getItem operation uses the request object with specific ItemId to retrieve the message information. The response return all To,CC, Body, Attachment (without content) information.

getAttachment
getAttachment operation uses the request object with specific attachmentId to retrieve attachment information with context and saves the attachment to disk.

All three operation method looks like below.

package testmailprj;
import javax.xml.bind.JAXBElement;
import javax.xml.ws.Holder;
 
import com.microsoft.schemas.exchange.services._2006.messages.*;
import com.microsoft.schemas.exchange.services._2006.types.*;

import java.io.FileOutputStream;

import java.util.*;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;

public class ItemAttachments {

    private ExchangeSettings exchangeEnvironmentSettings = new ExchangeSettings();

    public List getAttachments(ExchangeServicePortType port) {

        final DistinguishedFolderIdType distinguishedFolderIdType = new DistinguishedFolderIdType();
        distinguishedFolderIdType.setId(DistinguishedFolderIdNameType.INBOX);

        NonEmptyArrayOfBaseFolderIdsType nonEmptyArrayOfBaseFolderIdsType = new NonEmptyArrayOfBaseFolderIdsType();
        nonEmptyArrayOfBaseFolderIdsType.getFolderIdOrDistinguishedFolderId().add(distinguishedFolderIdType);
 

        final ItemResponseShapeType itemResponseShapeType = new ItemResponseShapeType();
        itemResponseShapeType.setBaseShape(DefaultShapeNamesType.ALL_PROPERTIES);
 

        FindItemType request = new FindItemType();
        request.setTraversal(ItemQueryTraversalType.SHALLOW); 
        request.setItemShape(itemResponseShapeType);
        request.setParentFolderIds(nonEmptyArrayOfBaseFolderIdsType);
        
        RestrictionType restriction = new RestrictionType();
        IsEqualToType isEqualTo = new IsEqualToType();
        PathToUnindexedFieldType pathToFieldType = new PathToUnindexedFieldType();
        pathToFieldType.setFieldURI(UnindexedFieldURIType.MESSAGE_IS_READ);
        FieldURIOrConstantType constantType = new FieldURIOrConstantType();
        ConstantValueType constantValueType = new ConstantValueType();
        constantValueType.setValue("0");
        constantType.setConstant(constantValueType);
        isEqualTo.setFieldURIOrConstant(constantType);
        com.microsoft.schemas.exchange.services._2006.types.ObjectFactory objectFactory = new com.microsoft.schemas.exchange.services._2006.types.ObjectFactory();
        JAXBElement fieldUriExpression = objectFactory.createFieldURI(pathToFieldType);
        isEqualTo.setPath(fieldUriExpression);
        JAXBElement isEqualToExpression = objectFactory.createIsEqualTo(isEqualTo);
        restriction.setSearchExpression(isEqualToExpression);
        request.setRestriction(restriction);
 
        FindItemResponseType findItemResponse = new FindItemResponseType();
        Holder findItemResult = new Holder(findItemResponse);
        //CommonUtils.displayXML(request);
        port.findItem(request, exchangeEnvironmentSettings.getMailboxCulture(), exchangeEnvironmentSettings.getRequestServerVersion(), findItemResult, exchangeEnvironmentSettings.getServerVersionInfoHolder()); 
        List items = new ArrayList();
 
        FindItemResponseType response = (FindItemResponseType) findItemResult.value;
        //CommonUtils.displayXML(response);
        ArrayOfResponseMessagesType arrayOfResponseMessagesType = response.getResponseMessages();
        List responseMessageTypeList = arrayOfResponseMessagesType.getCreateItemResponseMessageOrDeleteItemResponseMessageOrGetItemResponseMessage(); // Note: Best method name... ever!
 
        Iterator responseMessagesIterator = responseMessageTypeList.iterator();
 

        while (responseMessagesIterator.hasNext()) {
            JAXBElement jaxBElement = (JAXBElement) responseMessagesIterator.next();
            FindItemResponseMessageType findItemResponseMessageType = (FindItemResponseMessageType) jaxBElement.getValue();
            FindItemParentType findItemParentType = findItemResponseMessageType.getRootFolder();
 
            if (findItemParentType != null) {

                ArrayOfRealItemsType arrayOfRealItemsType = findItemParentType.getItems();
                List itemList = arrayOfRealItemsType.getItemOrMessageOrCalendarItem();
                 
                Iterator itemListIter = itemList.iterator();
                while (itemListIter.hasNext()) {
                    ItemType itemType = (ItemType) itemListIter.next();
                    MessageType messageType = (MessageType) itemType;

                    System.out.println(itemType.getSubject() + " has attachments ? " + itemType.isHasAttachments());
                    if(messageType.isHasAttachments()){
                        System.out.println("-----------Message has attachments. Getting more details here-----------");
                        getItem(port,messageType.getItemId());
                        System.out.println("-----------Message has attachments. Getting more details here-----------");
                    }
                    items.add(itemType);
                }
            }

        }
 
        return items;
    }
    public List getItem(ExchangeServicePortType port,ItemIdType id) {
 
        NonEmptyArrayOfBaseItemIdsType neabaseIds = new NonEmptyArrayOfBaseItemIdsType();
        neabaseIds.getItemIdOrOccurrenceItemIdOrRecurringMasterItemId().add(id);
        ItemResponseShapeType itemResponseShape = new ItemResponseShapeType();
        itemResponseShape.setBaseShape(DefaultShapeNamesType.ALL_PROPERTIES);
        itemResponseShape.setBodyType(BodyTypeResponseType.BEST);
        GetItemType request = new GetItemType();
        request.setItemIds(neabaseIds);
        request.setItemShape(itemResponseShape);

        GetItemResponseType getItemResponse = new GetItemResponseType();
        Holder getItemResult = new Holder(getItemResponse);
        //CommonUtils.displayXML(request);

        port.getItem(request, exchangeEnvironmentSettings.getMailboxCulture(), exchangeEnvironmentSettings.getRequestServerVersion(), getItemResult, exchangeEnvironmentSettings.getServerVersionInfoHolder()); 

        List items = new ArrayList();
    
        GetItemResponseType response = (GetItemResponseType) getItemResult.value;
        //CommonUtils.displayXML(response);
        ArrayOfResponseMessagesType arrayOfResponseMessagesType = response.getResponseMessages();
        List responseMessageTypeList = arrayOfResponseMessagesType.getCreateItemResponseMessageOrDeleteItemResponseMessageOrGetItemResponseMessage(); // Note: Best method name... ever!
    
        Iterator responseMessagesIterator = responseMessageTypeList.iterator();
    

        while (responseMessagesIterator.hasNext()) {
            JAXBElement jaxBElement = (JAXBElement) responseMessagesIterator.next();
            ItemInfoResponseMessageType getItemResponseMessageType = (ItemInfoResponseMessageType) jaxBElement.getValue();
            ArrayOfRealItemsType arrayOfRealItemsType = getItemResponseMessageType.getItems();

                List itemList = arrayOfRealItemsType.getItemOrMessageOrCalendarItem();
                 
                Iterator itemListIter = itemList.iterator();
                while (itemListIter.hasNext()) {
                    ItemType itemType = (ItemType) itemListIter.next();
                    MessageType messageType = (MessageType) itemType;

                    System.out.println(itemType.getSubject() + " has attachments ? " + itemType.isHasAttachments());
                    if(messageType.isHasAttachments()){
                        System.out.println("Message has attachments");
                        List attachments = messageType.getAttachments().getItemAttachmentOrFileAttachment();
                        for (AttachmentType attachment: attachments){
                            System.out.println(" attachment details id: " + attachment.getAttachmentId().getId());
                            System.out.println(" attachment details content location: " + attachment.getContentType());
                            System.out.println(" attachment details name and size: " + attachment.getName() + " " + attachment.getSize());
                            getAttachment(port,attachment.getAttachmentId());
                        }
                    }
                    items.add(itemType);
                }
        }
    
        return items;
    }
    public List getAttachment(ExchangeServicePortType port,AttachmentIdType id) {

        NonEmptyArrayOfRequestAttachmentIdsType neabaseIds = new NonEmptyArrayOfRequestAttachmentIdsType();
        neabaseIds.getAttachmentId().add(id);
        AttachmentResponseShapeType attachmentResponseShape = new AttachmentResponseShapeType();
        attachmentResponseShape.setBodyType(BodyTypeResponseType.BEST);
        attachmentResponseShape.setIncludeMimeContent(Boolean.TRUE);
        GetAttachmentType request = new GetAttachmentType();
        request.setAttachmentIds(neabaseIds); // SHALLOW means it doesn't look for "soft deleted" items.
        request.setAttachmentShape(attachmentResponseShape);

        GetAttachmentResponseType getAttachmentResponse = new GetAttachmentResponseType();
        Holder getAttachmentResult = new Holder(getAttachmentResponse);
        CommonUtils.displayXML(request);
        port.getAttachment(request, exchangeEnvironmentSettings.getMailboxCulture(), exchangeEnvironmentSettings.getRequestServerVersion(), getAttachmentResult, exchangeEnvironmentSettings.getServerVersionInfoHolder());

        List items = new ArrayList();
    
        GetAttachmentResponseType response = (GetAttachmentResponseType) getAttachmentResult.value;
        CommonUtils.displayXML(response);
        ArrayOfResponseMessagesType arrayOfResponseMessagesType = response.getResponseMessages();
        List responseMessageTypeList = arrayOfResponseMessagesType.getCreateItemResponseMessageOrDeleteItemResponseMessageOrGetItemResponseMessage(); // Note: Best method name... ever!
    
        Iterator responseMessagesIterator = responseMessageTypeList.iterator();
    

        while (responseMessagesIterator.hasNext()) {
            JAXBElement jaxBElement = (JAXBElement) responseMessagesIterator.next();
            AttachmentInfoResponseMessageType getAttachmentResponseMessageType = (AttachmentInfoResponseMessageType) jaxBElement.getValue();
            ArrayOfAttachmentsType arrayOfAttachmentsType = getAttachmentResponseMessageType.getAttachments();

                List itemList = arrayOfAttachmentsType.getItemAttachmentOrFileAttachment();
                 
                Iterator attachmentListIter = itemList.iterator();
                while (attachmentListIter.hasNext()) {
                    FileAttachmentType attachmentType = (FileAttachmentType) attachmentListIter.next();

                    System.out.println( attachmentType.getName()+ " attachment is " + attachmentType.getContentType());
                    byte[] content = attachmentType.getContent();
                    try {
                        FileOutputStream fos = new FileOutputStream("c:\\temp\\" + attachmentType.getName());
                        fos.write(content);
                        fos.close();
                    } catch (Exception ex){
                        ex.printStackTrace();    
                    }
                    items.add(attachmentType);
                }
        }
    
        return items;
    }
}

The Jdeveloper project is attached for download here. EWSProject