Question

Je fais un POC pour pousser les données d'un serveur (Java), bien que DataService de LCDS 3.1 à l'aide de RTMP.

La configuration est ok. Adobe Air Client DataMessage sur serveur (+ Enregistrement de l'assembleur en DB): OK

J'ai trouvé beaucoup d'exemples avec AsyncMessage, mais comme il s'agit d'une destination RTMP via un service DataService, je dois envoyer un dataMessage.

Il y a des bogues (ou je manque des choses / bon doc API!).

Alors s'il vous plaît, pourriez-vous m'aider?

Voici le code qui fait la poussée. La méthode clé est dopush ()

package mypackage.lcds.service.ds.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;

import mypackage.lcds.service.ds.DataPushService;
import mypackage.model.dto.AbstractDto;
import mypackage.model.exception.DsPushException;
import flex.data.messages.DataMessage;
import flex.messaging.MessageBroker;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.util.UUIDUtils;

/**
* Implementation of {@link DataPushService}.
*/
// see http://forums.adobe.com/thread/580667
// MessageCLient :
// http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/help .html?content=lcconnections_2.html
@Service
public final class DataPushServiceImpl implements DataPushService {
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);

    /**
     * Destination name for Data-service.<br>
     * See data-management-config.XML.
     */
    private static final String DESTINATION_NAME__POC_DS_XCHANGE = "poc-ds-xchange";

    /**
     * See data-management-config.XML.
     */
    private static final String PUSH_DTO_SERVICE__NAME = "data-service";

    /**
     * set "manually" by Spring (contexts workaround; not autowired).
     */
    private MessageBroker messageBroker = null;

    /**
     * Does the push of a single DTO.<br>
     * Only subscriberId's that are {@link Long} values will be used. Other Id's do not get a Message sent.
     *
     * @param dto
     *            {@link AbstractDto} object.
     * @param subscriberIds
     *            {@link Set} of LCDS Message subscriber IDs {@link Long}. If null, sends to all connected clients.
     *
     * @throws DsPushException
     *             if any error
     */
    @SuppressWarnings("unchecked")
    private void doPush(final AbstractDto dto, final Set<Long> subscriberIds)
            throws DsPushException {

        Set<?> ids = new HashSet<Object>();

        // obtain message service by means of message broker
        MessageService messageService = this.getMessageService();

        DataMessage message = this.createMessage(dto, messageService);

        // fill ids
        if ((subscriberIds == null) || (subscriberIds.isEmpty())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message all currently connected subscriberIds ");
            }

            Set idsFromDS = messageService.getSubscriberIds(message, true);
            if ((idsFromDS != null) && (!idsFromDS.isEmpty())) {
                CollectionUtils.addAll(ids, idsFromDS.iterator());
            }
        } else {
            CollectionUtils.addAll(ids, subscriberIds.iterator());
        }

        if (ids.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No subscriberId to send the Message to.");
                LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
            }
        } else {

            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to subscriberIds : " + subscriberIds.toString());
                LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
            }

            // send messages to all subscriberIds 1 by 1
            Object responsePayload = null;
            boolean isSent = false;
            for (Object id : ids) {

                if (id instanceof Long) {
                    try {
                        message.setHeader(Message.DESTINATION_CLIENT_ID_HEADER, id);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending LCDS DataMessage to subscriber [" + id + "] \n" + message.toString(2));
                        }
                        responsePayload = messageService.serviceMessage(message, true);

                        // no exception ==> means OK?
                        // TODO TEST retuned payload
                        isSent = true;

                    } catch (Exception e) {
                        LOG.error("Error while sending message to subscriberId " + id, e);
                        isSent = false;
                    } finally {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Message sent to '" + String.valueOf(id) + "' : " + String.valueOf(isSent));
                        }
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Avoiding subscriber ID (not a Long value) : " + String.valueOf(id));
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see DataPushService#pushToAllClients(AbstractDto)
     */
    // TODO test : if client is not connected, does LCDS record it for later (offline mode on the server?)
    public void pushToAllClients(final AbstractDto dto) throws DsPushException {
        this.doPush(dto, null);
    }

    public void pushTo1Client(AbstractDto dto, Long subscriberId) throws DsPushException {
        Set<Long> subscriberIds = new HashSet<Long>();
        subscriberIds.add(subscriberId);

        this.doPush(dto, subscriberIds);
    }

    /**
     * {@inheritDoc}<br>
     * subscriberIds refer to the 'clientId' set by the client app when it subscribes to the DS destination.
     *
     * @see DataPushService#pushToClients(AbstractDto, Set)
     */
    public void pushToClients(final AbstractDto dto, final Set<Long> subscriberIds) throws DsPushException {
        this.doPush(dto, subscriberIds);
    }

    @SuppressWarnings("unchecked")
    private DataMessage createMessage(final AbstractDto dto, final MessageService messageService) {
        DataMessage msg = new DataMessage();
        msg.setClientId(getServerId());
        msg.setTimestamp(System.currentTimeMillis());
        msg.setMessageId(UUIDUtils.createUUID(true));
        msg.setCorrelationId(msg.getMessageId()); // TODO OK messageId == CorrelationId ?
        msg.setDestination(DESTINATION_NAME__POC_DS_XCHANGE);
        msg.setBody(dto);
        msg.setOperation(DataMessage.CREATE_AND_SEQUENCE_OPERATION); // TODO OK operation?

        Map identity = new HashMap(2);
        // see data-management-config.xml
        identity.put("id", dto.getId());
        msg.setIdentity(identity);

        // FIXME set priority. How?
        if (LOG.isDebugEnabled()) {
            LOG.debug("LCDS DataMessage created : \n" + msg.toString(2));
        }
        return msg;
    }

    private Object getServerId() {
        // FIXME OK?
        return "X-BACKEND";
    }

    /**
     * Get the current {@link MessageBroker}'s service layer.
     *
     * @return {@link MessageService} to use for push data
     */
    private MessageService getMessageService() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting MessageBroker's DataService service ");
        }

        // Was : return (MessageService) MessageBroker.getMessageBroker(null).getService(PUSH_DTO_SERVICE__NAM E);
        return (MessageService) this.messageBroker.getService(PUSH_DTO_SERVICE__NAME);
    }

    /**
     * Set the messageBroker. For SPring.
     *
     * @param messageBroker
     *            the messageBroker to set
     */
    public void setMessageBroker(final MessageBroker messageBroker) {
        this.messageBroker = messageBroker;
    }
}

Remarque: le courtier de messagerie est défini une fois via le printemps. Cela fonctionne pour ce POC.

J'ai un servlet qui sauve un DTO à la DB et essaie ensuite de le pousser à travers le service. Tout semble correct, mais j'obtiens une NullPointerException (NPE).

Voici le journal Tomcat 6 (il envoie aux abonnés «99»):

LCDS DataMessage created :
Flex Message (flex.data.messages.DataMessage)
      operation = create_and_sequence
      id = {id=3203}
      clientId = X-BACKEND
      correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      destination = poc-ds-xchange
      messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      timestamp = 1297412881050
      timeToLive = 0
      body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending message to subscriberIds : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Known subscribers : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending LCDS DataMessage to subscriber [99]
Flex Message (flex.data.messages.DataMessage)
      operation = create_and_sequence
      id = {id=3203}
      clientId = X-BACKEND
      correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      destination = poc-ds-xchange
      messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      timestamp = 1297412881050
      timeToLive = 0
      body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
      hdr(DSDstClientId) = 99
09:28:02,456 ERROR [impl.DataPushServiceImpl] Error while sending message to subscriberId 99
java.lang.NullPointerException
    at flex.data.adapters.JavaAdapter.invokeAssemblerSync(JavaAdapter.java:1 741)
    at flex.data.adapters.JavaAdapter.invokeBatchOperation(JavaAdapter.java: 1630)
    at flex.data.adapters.JavaAdapter.invoke(JavaAdapter.java:658)
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:318)
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:233)
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.doPush(DataPushSer viceImpl.java:142)
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.pushTo1Client(Data PushServiceImpl.java:178)
    at mypackage.servlet.InteractionServlet.push(InteractionServlet.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. doInvokeMethod(HandlerMethodInvoker.java:421)
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. invokeHandlerMethod(HandlerMethodInvoker.java:136)
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.invokeHandlerMethod(AnnotationMethodHandlerAdapter.java:326)
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.handle(AnnotationMethodHandlerAdapter.java:313)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(Dispatch erServlet.java:875)
    at org.springframework.web.servlet.DispatcherServlet.doService(Dispatche rServlet.java:807)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(Frame workServlet.java:571)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServl et.java:501)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(Appl icationFilterChain.java:290)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationF ilterChain.java:206)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperV alve.java:233)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextV alve.java:175)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.j ava:128)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.j ava:102)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineVal ve.java:109)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.jav a:263)
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java :844)
    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.proce ss(Http11Protocol.java:584)
    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:44 7)
    at java.lang.Thread.run(Unknown Source)
09:28:02,472 DEBUG [impl.DataPushServiceImpl] Message sent to '99' : false

==> Qu'est-ce que je fais de mal?

Je ne peux pas retracer le code (je n'ai pas la source), mais l'exception lancée n'aide tout simplement pas du tout.

Est-ce que je manque un en-tête pour régler?

Je vous remercie beaucoup pour votre aide,

Était-ce utile?

La solution

Pour les enregistrements, je l'ai trouvé; o)

La base à savoir est que l'API DataServiceTransaction est un incontournable, si vous utilisez LC DataService.

Pour les deaux voir Fil du Forums Adobe

Pour les enregistrements ici, voici mon code de travail (de base):

/**
* ASSUMPTION : the client Flex/Air apps set the desired userId (= filter) as a fillParameter of the
* DataService.fill() method. This will filter output based on {@link AbstractDto#getUserId()}.
*/

@Service
public final class DataPushServiceImpl implements DataPushService {
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);
/* *********** V2 : DataServiceTransaction.createItem() ********* */
/**
 * Does the push of a single DTO.
 *
 * @param dto
 *            {@link AbstractDto} object. Contains the {@link AbstractDto#getUserId()} that is used by clients to
 *            filter data in the DataService.fill() method (used by the Assembler).
 *
 * @throws DsPushException
 *             if any error
 */
private boolean doPushViaTransaction(final AbstractDto dto) throws DsPushException {

    if (LOG.isDebugEnabled()) {
        LOG.debug("Sending message through DataServiceTransaction (see userId field) : " + dto.toString());
    }

    // One MUST instantiate a DataServiceTransaction to be able to send anything (NullPointerException)
    DataServiceTransaction dtx = null;
    boolean isOwnerOfTx = false;
    boolean isSent = false;
    try {
        // if already in an Assembler, we do have a tx ==> no commit nor rollback!
        dtx = DataServiceTransaction.getCurrentDataServiceTransaction();
        if (dtx == null) {
            // new one, no JTA ==> ourselves : commit or rollback
            isOwnerOfTx = true;
            //MessageBroker instantiated with SpringFlex is auto-named
            dtx = DataServiceTransaction.begin("_messageBroker", false);
        }

        isSent = this.doTransactionSend(dto, dtx);

    } catch (Exception e) {
        // Log exception, but no impact on the back-end business ==> swallow Exception
        LOG.error("Could not send the creation to LCDS", e);
        if (isOwnerOfTx) {
            dtx.rollback();
        }
    } finally {
        try {
            if (isOwnerOfTx && (dtx != null)) {
                dtx.commit();
            }
        } catch (Exception e) {
            // swallow
            LOG.error("Could not send the creation to LCDS (@commit of the DataServiceTransaction)", e);
        }
    }

    return isSent;

}

private boolean doTransactionSend(final AbstractDto dto, final DataServiceTransaction dtx) {

    boolean isSent = false;

    if (dto == null) {
        LOG.error("The given DTO is null! Nothing happens");

    } else {
        try {
            dtx.createItem(FlexUtils.DESTINATION_NAME__POC_DS, dto);
            isSent = true; // no problem
        } catch (Exception e) {
            // Log exception, but no impact on the business
            LOG.error("Could not send the creation to LCDS for DTO " + dto.toString(), e);
        } finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug("DTO : " + dto.toString() + "\n sent : " + String.valueOf(isSent));
            }
        }
    }

    return isSent;
}

//implementation of DataPushService interface
/**
 * {@inheritDoc}
 *
 * @see DataPushService#pushNewDTo(AbstractDto, java.lang.Long)
 */
@Transactional(rollbackFor = DsPushException.class)
public boolean pushNewDTo(final AbstractDto dto, final Long subscriberId) throws DsPushException {
    return this.doPushViaTransaction(dto);
}
}

Profitez et merci pour votre temps!

G.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top