Domanda

What are the options available to develop Java applications using Service Bus for Windows?

  1. Java Message Broker API - This need ACS to work with, which SB for Win doesnt support.
  2. AMQP - This doesnt seem to work on SB for Windows, I keep getting error

    org.apache.qpid.amqp_1_0.client.Sender$SenderCreationException: Peer did not create remote endpoint for link, target:

While the same code works with Azure SB. So AMQP on SB for Windows seems to be not fully working?

Correct me if I have missed something?

Update To test AMQP on local machine, this is what I did

  1. Installed Service bus 1.1 on my local machine
  2. Took the sample mentioned here http://www.windowsazure.com/en-us/develop/java/how-to-guides/service-bus-amqp/
  3. Created a new namespace on my local machine
  4. Specified the following connection string in servicebus.properties (which is correctly referred in the code

    connectionfactory.SBCF = amqps://<username>:<password>@<MachineName>:5671/StringAnalyzerNS/

    queue.QUEUE = queue1

Code is updated with certificates.

At runtime I get this error

javax.jms.JMSException: Peer did not create remote endpoint for link, target: queue1
    at org.apache.qpid.amqp_1_0.jms.impl.MessageProducerImpl.<init>(MessageProducerImpl.java:77)
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createProducer(SessionImpl.java:348)
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createProducer(SessionImpl.java:63)
    at com.stringcompany.Analyzer.SimpleSenderReceiver.<init>(SimpleSenderReceiver.java:70)
    at com.stringcompany.Analyzer.SimpleSenderReceiver.main(SimpleSenderReceiver.java:95)
Caused by: org.apache.qpid.amqp_1_0.client.Sender$SenderCreationException: Peer did not create remote endpoint for link, target: queue1
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:171)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:104)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:97)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:83)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:69)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:63)
    at org.apache.qpid.amqp_1_0.client.Session.createSender(Session.java:74)
    at org.apache.qpid.amqp_1_0.client.Session.createSender(Session.java:66)
    at org.apache.qpid.amqp_1_0.jms.impl.MessageProducerImpl.<init>(MessageProducerImpl.java:72)
    ... 4 more
javax.jms.JMSException: Session remotely closed

With the same code If I point to Azure service bus by setting the SB namespace and queue like below

connectionfactory.SBCF = amqps://<Policy name>:<Sec. Key>@<ns>.servicebus.windows.net queue.QUEUE = testq

This works, messages are exchanged.

Here is the code if someone wants to try it

package com.stringcompany.Analyzer;

//SimpleSenderReceiver.java

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class SimpleSenderReceiver implements MessageListener {
    private static boolean runReceiver = true;
    private Connection connection;
    private Session sendSession;
    private Session receiveSession;
    private MessageProducer sender;
    private MessageConsumer receiver;
    private static Random randomGenerator = new Random();

public SimpleSenderReceiver() throws Exception {
    // Configure JNDI environment
    Hashtable<String, String> env = new Hashtable<String, String>();
    env.put(Context.INITIAL_CONTEXT_FACTORY,
            "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
    env.put(Context.PROVIDER_URL, "D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\servicebus.properties");
    Context context = new InitialContext(env);

    // Lookup ConnectionFactory and Queue
    ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

    System.out.println("cf:"+cf);

    // Create Connection
    connection = cf.createConnection();
    System.out.println("connection :"+connection);

    connection.setExceptionListener(new ExceptionListener() {

        public void onException(JMSException arg0) {
            System.err.println(arg0);

        }
    });
    connection.start();



    // Create sender-side Session and MessageProducer
    sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    System.out.println("Session open");


    Destination queue = (Destination) context.lookup("QUEUE");
    System.out.println("queue:"+queue);

    sender = sendSession.createProducer(queue);
    Queue q=(Queue) queue;
    System.out.println(sender.getDestination());
    System.out.println("sender:"+sender);
    if (runReceiver) {
        System.out.println("Waitng for new message");
        // Create receiver-side Session, MessageConsumer,and MessageListener
        receiveSession = connection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        receiver = receiveSession.createConsumer(queue);
        receiver.setMessageListener(this);
        connection.start();
    }
}

public static void main(String[] args) {
    try {

        if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
            runReceiver = false;
        }

        //System.setProperty("javax.net.debug","ssl");
        System.setProperty("javax.net.ssl.trustStore","D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\SBKeystore.keystore");
        System.setProperty("log4j.configuration","D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\log4j.properties");
        SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
        System.out
                .println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
        BufferedReader commandLine = new java.io.BufferedReader(
                new InputStreamReader(System.in));

        while (true) {
            String s = "Message";//commandLine.readLine();
            if (s.equalsIgnoreCase("exit")) {
                simpleSenderReceiver.close();
                System.exit(0);
            } else {
                simpleSenderReceiver.sendMessage();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private void sendMessage() throws JMSException {
    TextMessage message = sendSession.createTextMessage();
    message.setText("Test AMQP message from JMS");
    long randomMessageID = randomGenerator.nextLong() >>> 1;
    message.setJMSMessageID("ID:" + randomMessageID);
    sender.send(message);
    System.out.println("Sent message with JMSMessageID = "
            + message.getJMSMessageID());
}

public void close() throws JMSException {
    connection.close();
}

public void onMessage(Message message) {
    try {
        System.out.println("Received message with JMSMessageID = "
                + message.getJMSMessageID());
        message.acknowledge();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

  }

enter image description here

È stato utile?

Soluzione

Hi we had the same problems and thankfully MS updated their documentation to show how to do this correctly. : http://msdn.microsoft.com/en-us/library/dn574799.aspx

Altri suggerimenti

The simplest answer to the question is as you should URL Encode the SASPolicyKey.

connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

Where SASPolicyKey should be URL-Encoded.

AMQP 1.0 is supported with Service Bus 1.1 for windows server. Basically there are two differences between the cloud and on-prem usage of AMQP in ServiceBus: 1. Addressing: You will need to build an AMQP connection strings (and will need DNS in case you're looking for HA) 2. Authentication: You will need to use domain joined accounts as ACS is not there on-prem. You will also need to distribute your SB certificate to your clients.

Ok, I have sorted the first issue (Java Message Broker API not supporting SAS endpoint), by writing a wrapper which will seamlessly work with existing API. You can get the library from this GitHub repository. With this, I can develop/test my Java application on local service bus environment and host it on Azure / On-Premise Service Bus farm.

https://github.com/Dhana-Krishnasamy/ServiceBusForWindows-SASWrapper

The sender and receiver Queues you will have to configure differently. Here is an example of my working configuration (servicebus.properties):

connectionfactory.SBCF = amqps://$PolicyName:$UrlEncodedKey@$Your-EventHub-NamespaceName.servicebus.windows.net

queue.EventHubSender=$YourEventHubName queue.EventHubReceiver=$YourEventHubName/ConsumerGroups/$YourConsumerGroupName/Partitions/1

Replace appropriately your own '$' items in there. The Shared Policy Key has to be URL encoded. Make sure that your sender will reference the 'EventHubSender' defined in this config and the receiver will reference the 'EventHubReciever'.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top