Hi we had the same problems and thankfully MS updated their documentation to show how to do this correctly. : http://msdn.microsoft.com/en-us/library/dn574799.aspx
Java on ServiceBus for Windows Server- Options?
-
07-08-2022 - |
Domanda
What are the options available to develop Java applications using Service Bus for Windows?
- Java Message Broker API - This need ACS to work with, which SB for Win doesnt support.
AMQP - This doesnt seem to work on SB for Windows, I keep getting error
org.apache.qpid.amqp_1_0.client.Sender$SenderCreationException: Peer did not create remote endpoint for link, target:
While the same code works with Azure SB. So AMQP on SB for Windows seems to be not fully working?
Correct me if I have missed something?
Update To test AMQP on local machine, this is what I did
- Installed Service bus 1.1 on my local machine
- Took the sample mentioned here http://www.windowsazure.com/en-us/develop/java/how-to-guides/service-bus-amqp/
- Created a new namespace on my local machine
Specified the following connection string in servicebus.properties (which is correctly referred in the code
connectionfactory.SBCF = amqps://<username>:<password>@<MachineName>:5671/StringAnalyzerNS/
queue.QUEUE = queue1
Code is updated with certificates.
At runtime I get this error
javax.jms.JMSException: Peer did not create remote endpoint for link, target: queue1
at org.apache.qpid.amqp_1_0.jms.impl.MessageProducerImpl.<init>(MessageProducerImpl.java:77)
at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createProducer(SessionImpl.java:348)
at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createProducer(SessionImpl.java:63)
at com.stringcompany.Analyzer.SimpleSenderReceiver.<init>(SimpleSenderReceiver.java:70)
at com.stringcompany.Analyzer.SimpleSenderReceiver.main(SimpleSenderReceiver.java:95)
Caused by: org.apache.qpid.amqp_1_0.client.Sender$SenderCreationException: Peer did not create remote endpoint for link, target: queue1
at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:171)
at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:104)
at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:97)
at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:83)
at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:69)
at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:63)
at org.apache.qpid.amqp_1_0.client.Session.createSender(Session.java:74)
at org.apache.qpid.amqp_1_0.client.Session.createSender(Session.java:66)
at org.apache.qpid.amqp_1_0.jms.impl.MessageProducerImpl.<init>(MessageProducerImpl.java:72)
... 4 more
javax.jms.JMSException: Session remotely closed
With the same code If I point to Azure service bus by setting the SB namespace and queue like below
connectionfactory.SBCF = amqps://<Policy name>:<Sec. Key>@<ns>.servicebus.windows.net
queue.QUEUE = testq
This works, messages are exchanged.
Here is the code if someone wants to try it
package com.stringcompany.Analyzer;
//SimpleSenderReceiver.java
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
public class SimpleSenderReceiver implements MessageListener {
private static boolean runReceiver = true;
private Connection connection;
private Session sendSession;
private Session receiveSession;
private MessageProducer sender;
private MessageConsumer receiver;
private static Random randomGenerator = new Random();
public SimpleSenderReceiver() throws Exception {
// Configure JNDI environment
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put(Context.PROVIDER_URL, "D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\servicebus.properties");
Context context = new InitialContext(env);
// Lookup ConnectionFactory and Queue
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
System.out.println("cf:"+cf);
// Create Connection
connection = cf.createConnection();
System.out.println("connection :"+connection);
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException arg0) {
System.err.println(arg0);
}
});
connection.start();
// Create sender-side Session and MessageProducer
sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("Session open");
Destination queue = (Destination) context.lookup("QUEUE");
System.out.println("queue:"+queue);
sender = sendSession.createProducer(queue);
Queue q=(Queue) queue;
System.out.println(sender.getDestination());
System.out.println("sender:"+sender);
if (runReceiver) {
System.out.println("Waitng for new message");
// Create receiver-side Session, MessageConsumer,and MessageListener
receiveSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
receiver = receiveSession.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
}
}
public static void main(String[] args) {
try {
if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
runReceiver = false;
}
//System.setProperty("javax.net.debug","ssl");
System.setProperty("javax.net.ssl.trustStore","D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\SBKeystore.keystore");
System.setProperty("log4j.configuration","D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\log4j.properties");
SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
System.out
.println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
BufferedReader commandLine = new java.io.BufferedReader(
new InputStreamReader(System.in));
while (true) {
String s = "Message";//commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
simpleSenderReceiver.close();
System.exit(0);
} else {
simpleSenderReceiver.sendMessage();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void sendMessage() throws JMSException {
TextMessage message = sendSession.createTextMessage();
message.setText("Test AMQP message from JMS");
long randomMessageID = randomGenerator.nextLong() >>> 1;
message.setJMSMessageID("ID:" + randomMessageID);
sender.send(message);
System.out.println("Sent message with JMSMessageID = "
+ message.getJMSMessageID());
}
public void close() throws JMSException {
connection.close();
}
public void onMessage(Message message) {
try {
System.out.println("Received message with JMSMessageID = "
+ message.getJMSMessageID());
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Soluzione
Altri suggerimenti
The simplest answer to the question is as you should URL Encode the SASPolicyKey.
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net
Where SASPolicyKey should be URL-Encoded.
AMQP 1.0 is supported with Service Bus 1.1 for windows server. Basically there are two differences between the cloud and on-prem usage of AMQP in ServiceBus: 1. Addressing: You will need to build an AMQP connection strings (and will need DNS in case you're looking for HA) 2. Authentication: You will need to use domain joined accounts as ACS is not there on-prem. You will also need to distribute your SB certificate to your clients.
Ok, I have sorted the first issue (Java Message Broker API not supporting SAS endpoint), by writing a wrapper which will seamlessly work with existing API. You can get the library from this GitHub repository. With this, I can develop/test my Java application on local service bus environment and host it on Azure / On-Premise Service Bus farm.
https://github.com/Dhana-Krishnasamy/ServiceBusForWindows-SASWrapper
The sender and receiver Queues you will have to configure differently. Here is an example of my working configuration (servicebus.properties):
connectionfactory.SBCF = amqps://$PolicyName:$UrlEncodedKey@$Your-EventHub-NamespaceName.servicebus.windows.net
queue.EventHubSender=$YourEventHubName queue.EventHubReceiver=$YourEventHubName/ConsumerGroups/$YourConsumerGroupName/Partitions/1
Replace appropriately your own '$' items in there. The Shared Policy Key has to be URL encoded. Make sure that your sender will reference the 'EventHubSender' defined in this config and the receiver will reference the 'EventHubReciever'.
Grab the Azure Java SDK from http://www.windowsazure.com/en-us/develop/java/ and then follow this guide: http://www.windowsazure.com/en-us/develop/java/how-to-guides/service-bus-queues/