Pregunta

Estoy haciendo un POC para presionar los datos de un servidor (Java), a través de la base de datos de LCDS 3.1 usando RTMP.

Configuración está bien. DataMessage de Adobe Air Client To Server (+ Ensambler Guarding in DB): OK

Encontré muchos ejemplos con AsyncMessage, pero como este es un destino RTMP a través de un servicio de DataService, debo enviar un Datamessage.

Aparentemente, hay algunos errores (o estoy perdiendo cosas / buen API Doc!).

Por favor, ¿podría ayudarme?

Aquí está el código que hace el empuje. El método clave es Dopush ()

package mypackage.lcds.service.ds.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;

import mypackage.lcds.service.ds.DataPushService;
import mypackage.model.dto.AbstractDto;
import mypackage.model.exception.DsPushException;
import flex.data.messages.DataMessage;
import flex.messaging.MessageBroker;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.util.UUIDUtils;

/**
* Implementation of {@link DataPushService}.
*/
// see http://forums.adobe.com/thread/580667
// MessageCLient :
// http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/help .html?content=lcconnections_2.html
@Service
public final class DataPushServiceImpl implements DataPushService {
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);

    /**
     * Destination name for Data-service.<br>
     * See data-management-config.XML.
     */
    private static final String DESTINATION_NAME__POC_DS_XCHANGE = "poc-ds-xchange";

    /**
     * See data-management-config.XML.
     */
    private static final String PUSH_DTO_SERVICE__NAME = "data-service";

    /**
     * set "manually" by Spring (contexts workaround; not autowired).
     */
    private MessageBroker messageBroker = null;

    /**
     * Does the push of a single DTO.<br>
     * Only subscriberId's that are {@link Long} values will be used. Other Id's do not get a Message sent.
     *
     * @param dto
     *            {@link AbstractDto} object.
     * @param subscriberIds
     *            {@link Set} of LCDS Message subscriber IDs {@link Long}. If null, sends to all connected clients.
     *
     * @throws DsPushException
     *             if any error
     */
    @SuppressWarnings("unchecked")
    private void doPush(final AbstractDto dto, final Set<Long> subscriberIds)
            throws DsPushException {

        Set<?> ids = new HashSet<Object>();

        // obtain message service by means of message broker
        MessageService messageService = this.getMessageService();

        DataMessage message = this.createMessage(dto, messageService);

        // fill ids
        if ((subscriberIds == null) || (subscriberIds.isEmpty())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message all currently connected subscriberIds ");
            }

            Set idsFromDS = messageService.getSubscriberIds(message, true);
            if ((idsFromDS != null) && (!idsFromDS.isEmpty())) {
                CollectionUtils.addAll(ids, idsFromDS.iterator());
            }
        } else {
            CollectionUtils.addAll(ids, subscriberIds.iterator());
        }

        if (ids.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No subscriberId to send the Message to.");
                LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
            }
        } else {

            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to subscriberIds : " + subscriberIds.toString());
                LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
            }

            // send messages to all subscriberIds 1 by 1
            Object responsePayload = null;
            boolean isSent = false;
            for (Object id : ids) {

                if (id instanceof Long) {
                    try {
                        message.setHeader(Message.DESTINATION_CLIENT_ID_HEADER, id);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending LCDS DataMessage to subscriber [" + id + "] \n" + message.toString(2));
                        }
                        responsePayload = messageService.serviceMessage(message, true);

                        // no exception ==> means OK?
                        // TODO TEST retuned payload
                        isSent = true;

                    } catch (Exception e) {
                        LOG.error("Error while sending message to subscriberId " + id, e);
                        isSent = false;
                    } finally {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Message sent to '" + String.valueOf(id) + "' : " + String.valueOf(isSent));
                        }
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Avoiding subscriber ID (not a Long value) : " + String.valueOf(id));
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see DataPushService#pushToAllClients(AbstractDto)
     */
    // TODO test : if client is not connected, does LCDS record it for later (offline mode on the server?)
    public void pushToAllClients(final AbstractDto dto) throws DsPushException {
        this.doPush(dto, null);
    }

    public void pushTo1Client(AbstractDto dto, Long subscriberId) throws DsPushException {
        Set<Long> subscriberIds = new HashSet<Long>();
        subscriberIds.add(subscriberId);

        this.doPush(dto, subscriberIds);
    }

    /**
     * {@inheritDoc}<br>
     * subscriberIds refer to the 'clientId' set by the client app when it subscribes to the DS destination.
     *
     * @see DataPushService#pushToClients(AbstractDto, Set)
     */
    public void pushToClients(final AbstractDto dto, final Set<Long> subscriberIds) throws DsPushException {
        this.doPush(dto, subscriberIds);
    }

    @SuppressWarnings("unchecked")
    private DataMessage createMessage(final AbstractDto dto, final MessageService messageService) {
        DataMessage msg = new DataMessage();
        msg.setClientId(getServerId());
        msg.setTimestamp(System.currentTimeMillis());
        msg.setMessageId(UUIDUtils.createUUID(true));
        msg.setCorrelationId(msg.getMessageId()); // TODO OK messageId == CorrelationId ?
        msg.setDestination(DESTINATION_NAME__POC_DS_XCHANGE);
        msg.setBody(dto);
        msg.setOperation(DataMessage.CREATE_AND_SEQUENCE_OPERATION); // TODO OK operation?

        Map identity = new HashMap(2);
        // see data-management-config.xml
        identity.put("id", dto.getId());
        msg.setIdentity(identity);

        // FIXME set priority. How?
        if (LOG.isDebugEnabled()) {
            LOG.debug("LCDS DataMessage created : \n" + msg.toString(2));
        }
        return msg;
    }

    private Object getServerId() {
        // FIXME OK?
        return "X-BACKEND";
    }

    /**
     * Get the current {@link MessageBroker}'s service layer.
     *
     * @return {@link MessageService} to use for push data
     */
    private MessageService getMessageService() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting MessageBroker's DataService service ");
        }

        // Was : return (MessageService) MessageBroker.getMessageBroker(null).getService(PUSH_DTO_SERVICE__NAM E);
        return (MessageService) this.messageBroker.getService(PUSH_DTO_SERVICE__NAME);
    }

    /**
     * Set the messageBroker. For SPring.
     *
     * @param messageBroker
     *            the messageBroker to set
     */
    public void setMessageBroker(final MessageBroker messageBroker) {
        this.messageBroker = messageBroker;
    }
}

Nota: el mensaje se establece una vez a través de la primavera. Funciona para este POC.

Tengo un servlet que guarda una DTO a la DB y luego intenta presionarlo a través del servicio. Todo parece estar bien, pero obtengo una nullpointerException (NPE).

Aquí está el registro Tomcat 6 (se envía a suscriptorio '99'):

LCDS DataMessage created :
Flex Message (flex.data.messages.DataMessage)
      operation = create_and_sequence
      id = {id=3203}
      clientId = X-BACKEND
      correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      destination = poc-ds-xchange
      messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      timestamp = 1297412881050
      timeToLive = 0
      body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending message to subscriberIds : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Known subscribers : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending LCDS DataMessage to subscriber [99]
Flex Message (flex.data.messages.DataMessage)
      operation = create_and_sequence
      id = {id=3203}
      clientId = X-BACKEND
      correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      destination = poc-ds-xchange
      messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      timestamp = 1297412881050
      timeToLive = 0
      body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
      hdr(DSDstClientId) = 99
09:28:02,456 ERROR [impl.DataPushServiceImpl] Error while sending message to subscriberId 99
java.lang.NullPointerException
    at flex.data.adapters.JavaAdapter.invokeAssemblerSync(JavaAdapter.java:1 741)
    at flex.data.adapters.JavaAdapter.invokeBatchOperation(JavaAdapter.java: 1630)
    at flex.data.adapters.JavaAdapter.invoke(JavaAdapter.java:658)
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:318)
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:233)
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.doPush(DataPushSer viceImpl.java:142)
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.pushTo1Client(Data PushServiceImpl.java:178)
    at mypackage.servlet.InteractionServlet.push(InteractionServlet.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. doInvokeMethod(HandlerMethodInvoker.java:421)
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. invokeHandlerMethod(HandlerMethodInvoker.java:136)
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.invokeHandlerMethod(AnnotationMethodHandlerAdapter.java:326)
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.handle(AnnotationMethodHandlerAdapter.java:313)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(Dispatch erServlet.java:875)
    at org.springframework.web.servlet.DispatcherServlet.doService(Dispatche rServlet.java:807)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(Frame workServlet.java:571)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServl et.java:501)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(Appl icationFilterChain.java:290)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationF ilterChain.java:206)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperV alve.java:233)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextV alve.java:175)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.j ava:128)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.j ava:102)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineVal ve.java:109)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.jav a:263)
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java :844)
    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.proce ss(Http11Protocol.java:584)
    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:44 7)
    at java.lang.Thread.run(Unknown Source)
09:28:02,472 DEBUG [impl.DataPushServiceImpl] Message sent to '99' : false

==> ¿Qué estoy haciendo mal?

No puedo rastrear el código (no tengo la fuente), pero la excepción lanzada simplemente no está ayudando en absoluto.

¿Estoy perdiendo un encabezado para establecer?

Muchas gracias por su ayuda,

¿Fue útil?

Solución

Para los registros, la encontré; O)

La base para saber es que la API de DataServiceTransaction es una necesidad imprescindible, si está utilizando DataService LC.

para Deails ver adobe foros hilo

Para los registros aquí, aquí está mi código de trabajo (básico):

/**
* ASSUMPTION : the client Flex/Air apps set the desired userId (= filter) as a fillParameter of the
* DataService.fill() method. This will filter output based on {@link AbstractDto#getUserId()}.
*/

@Service
public final class DataPushServiceImpl implements DataPushService {
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);
/* *********** V2 : DataServiceTransaction.createItem() ********* */
/**
 * Does the push of a single DTO.
 *
 * @param dto
 *            {@link AbstractDto} object. Contains the {@link AbstractDto#getUserId()} that is used by clients to
 *            filter data in the DataService.fill() method (used by the Assembler).
 *
 * @throws DsPushException
 *             if any error
 */
private boolean doPushViaTransaction(final AbstractDto dto) throws DsPushException {

    if (LOG.isDebugEnabled()) {
        LOG.debug("Sending message through DataServiceTransaction (see userId field) : " + dto.toString());
    }

    // One MUST instantiate a DataServiceTransaction to be able to send anything (NullPointerException)
    DataServiceTransaction dtx = null;
    boolean isOwnerOfTx = false;
    boolean isSent = false;
    try {
        // if already in an Assembler, we do have a tx ==> no commit nor rollback!
        dtx = DataServiceTransaction.getCurrentDataServiceTransaction();
        if (dtx == null) {
            // new one, no JTA ==> ourselves : commit or rollback
            isOwnerOfTx = true;
            //MessageBroker instantiated with SpringFlex is auto-named
            dtx = DataServiceTransaction.begin("_messageBroker", false);
        }

        isSent = this.doTransactionSend(dto, dtx);

    } catch (Exception e) {
        // Log exception, but no impact on the back-end business ==> swallow Exception
        LOG.error("Could not send the creation to LCDS", e);
        if (isOwnerOfTx) {
            dtx.rollback();
        }
    } finally {
        try {
            if (isOwnerOfTx && (dtx != null)) {
                dtx.commit();
            }
        } catch (Exception e) {
            // swallow
            LOG.error("Could not send the creation to LCDS (@commit of the DataServiceTransaction)", e);
        }
    }

    return isSent;

}

private boolean doTransactionSend(final AbstractDto dto, final DataServiceTransaction dtx) {

    boolean isSent = false;

    if (dto == null) {
        LOG.error("The given DTO is null! Nothing happens");

    } else {
        try {
            dtx.createItem(FlexUtils.DESTINATION_NAME__POC_DS, dto);
            isSent = true; // no problem
        } catch (Exception e) {
            // Log exception, but no impact on the business
            LOG.error("Could not send the creation to LCDS for DTO " + dto.toString(), e);
        } finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug("DTO : " + dto.toString() + "\n sent : " + String.valueOf(isSent));
            }
        }
    }

    return isSent;
}

//implementation of DataPushService interface
/**
 * {@inheritDoc}
 *
 * @see DataPushService#pushNewDTo(AbstractDto, java.lang.Long)
 */
@Transactional(rollbackFor = DsPushException.class)
public boolean pushNewDTo(final AbstractDto dto, final Long subscriberId) throws DsPushException {
    return this.doPushViaTransaction(dto);
}
}

disfruta y gracias por su tiempo!

g.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top