Java Push DataMessage through Adobe RTMP LCDS DataService

1.2k views Asked by At

I'm doing a POC to push Data from a (Java) server, though LCDS 3.1 's DataService using RTMP.

Configuration is OK. Adobe Air client DataMessage to server (+Assembler saving in DB) : OK

I found lots of examples with AsyncMessage, but as This is an RTMP destination through a DataService service, I must send a DataMessage.

Appearently, there are some bugs (or I am missing things/good API doc!).

So please, could you help me?

Here is the code that does the push. The key method is doPush()

package mypackage.lcds.service.ds.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;

import mypackage.lcds.service.ds.DataPushService;
import mypackage.model.dto.AbstractDto;
import mypackage.model.exception.DsPushException;
import flex.data.messages.DataMessage;
import flex.messaging.MessageBroker;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.util.UUIDUtils;

/**
* Implementation of {@link DataPushService}.
*/
// see http://forums.adobe.com/thread/580667
// MessageCLient :
// http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/help .html?content=lcconnections_2.html
@Service
public final class DataPushServiceImpl implements DataPushService {
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);

    /**
     * Destination name for Data-service.<br>
     * See data-management-config.XML.
     */
    private static final String DESTINATION_NAME__POC_DS_XCHANGE = "poc-ds-xchange";

    /**
     * See data-management-config.XML.
     */
    private static final String PUSH_DTO_SERVICE__NAME = "data-service";

    /**
     * set "manually" by Spring (contexts workaround; not autowired).
     */
    private MessageBroker messageBroker = null;

    /**
     * Does the push of a single DTO.<br>
     * Only subscriberId's that are {@link Long} values will be used. Other Id's do not get a Message sent.
     *
     * @param dto
     *            {@link AbstractDto} object.
     * @param subscriberIds
     *            {@link Set} of LCDS Message subscriber IDs {@link Long}. If null, sends to all connected clients.
     *
     * @throws DsPushException
     *             if any error
     */
    @SuppressWarnings("unchecked")
    private void doPush(final AbstractDto dto, final Set<Long> subscriberIds)
            throws DsPushException {

        Set<?> ids = new HashSet<Object>();

        // obtain message service by means of message broker
        MessageService messageService = this.getMessageService();

        DataMessage message = this.createMessage(dto, messageService);

        // fill ids
        if ((subscriberIds == null) || (subscriberIds.isEmpty())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message all currently connected subscriberIds ");
            }

            Set idsFromDS = messageService.getSubscriberIds(message, true);
            if ((idsFromDS != null) && (!idsFromDS.isEmpty())) {
                CollectionUtils.addAll(ids, idsFromDS.iterator());
            }
        } else {
            CollectionUtils.addAll(ids, subscriberIds.iterator());
        }

        if (ids.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No subscriberId to send the Message to.");
                LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
            }
        } else {

            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message to subscriberIds : " + subscriberIds.toString());
                LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
            }

            // send messages to all subscriberIds 1 by 1
            Object responsePayload = null;
            boolean isSent = false;
            for (Object id : ids) {

                if (id instanceof Long) {
                    try {
                        message.setHeader(Message.DESTINATION_CLIENT_ID_HEADER, id);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending LCDS DataMessage to subscriber [" + id + "] \n" + message.toString(2));
                        }
                        responsePayload = messageService.serviceMessage(message, true);

                        // no exception ==> means OK?
                        // TODO TEST retuned payload
                        isSent = true;

                    } catch (Exception e) {
                        LOG.error("Error while sending message to subscriberId " + id, e);
                        isSent = false;
                    } finally {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Message sent to '" + String.valueOf(id) + "' : " + String.valueOf(isSent));
                        }
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Avoiding subscriber ID (not a Long value) : " + String.valueOf(id));
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see DataPushService#pushToAllClients(AbstractDto)
     */
    // TODO test : if client is not connected, does LCDS record it for later (offline mode on the server?)
    public void pushToAllClients(final AbstractDto dto) throws DsPushException {
        this.doPush(dto, null);
    }

    public void pushTo1Client(AbstractDto dto, Long subscriberId) throws DsPushException {
        Set<Long> subscriberIds = new HashSet<Long>();
        subscriberIds.add(subscriberId);

        this.doPush(dto, subscriberIds);
    }

    /**
     * {@inheritDoc}<br>
     * subscriberIds refer to the 'clientId' set by the client app when it subscribes to the DS destination.
     *
     * @see DataPushService#pushToClients(AbstractDto, Set)
     */
    public void pushToClients(final AbstractDto dto, final Set<Long> subscriberIds) throws DsPushException {
        this.doPush(dto, subscriberIds);
    }

    @SuppressWarnings("unchecked")
    private DataMessage createMessage(final AbstractDto dto, final MessageService messageService) {
        DataMessage msg = new DataMessage();
        msg.setClientId(getServerId());
        msg.setTimestamp(System.currentTimeMillis());
        msg.setMessageId(UUIDUtils.createUUID(true));
        msg.setCorrelationId(msg.getMessageId()); // TODO OK messageId == CorrelationId ?
        msg.setDestination(DESTINATION_NAME__POC_DS_XCHANGE);
        msg.setBody(dto);
        msg.setOperation(DataMessage.CREATE_AND_SEQUENCE_OPERATION); // TODO OK operation?

        Map identity = new HashMap(2);
        // see data-management-config.xml
        identity.put("id", dto.getId());
        msg.setIdentity(identity);

        // FIXME set priority. How?
        if (LOG.isDebugEnabled()) {
            LOG.debug("LCDS DataMessage created : \n" + msg.toString(2));
        }
        return msg;
    }

    private Object getServerId() {
        // FIXME OK?
        return "X-BACKEND";
    }

    /**
     * Get the current {@link MessageBroker}'s service layer.
     *
     * @return {@link MessageService} to use for push data
     */
    private MessageService getMessageService() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Getting MessageBroker's DataService service ");
        }

        // Was : return (MessageService) MessageBroker.getMessageBroker(null).getService(PUSH_DTO_SERVICE__NAM E);
        return (MessageService) this.messageBroker.getService(PUSH_DTO_SERVICE__NAME);
    }

    /**
     * Set the messageBroker. For SPring.
     *
     * @param messageBroker
     *            the messageBroker to set
     */
    public void setMessageBroker(final MessageBroker messageBroker) {
        this.messageBroker = messageBroker;
    }
}

NOTE : the messagebroker is set once through Spring. It works for this POC.

I have a Servlet that saves a DTO to the DB and then tries to push it through the service. All seems OK, but I get a NullPointerException (NPE).

Here is the Tomcat 6 LOG (it sends to subscriberID '99' ):

LCDS DataMessage created :
Flex Message (flex.data.messages.DataMessage)
      operation = create_and_sequence
      id = {id=3203}
      clientId = X-BACKEND
      correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      destination = poc-ds-xchange
      messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      timestamp = 1297412881050
      timeToLive = 0
      body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending message to subscriberIds : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Known subscribers : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending LCDS DataMessage to subscriber [99]
Flex Message (flex.data.messages.DataMessage)
      operation = create_and_sequence
      id = {id=3203}
      clientId = X-BACKEND
      correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      destination = poc-ds-xchange
      messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
      timestamp = 1297412881050
      timeToLive = 0
      body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
      hdr(DSDstClientId) = 99
09:28:02,456 ERROR [impl.DataPushServiceImpl] Error while sending message to subscriberId 99
java.lang.NullPointerException
    at flex.data.adapters.JavaAdapter.invokeAssemblerSync(JavaAdapter.java:1 741)
    at flex.data.adapters.JavaAdapter.invokeBatchOperation(JavaAdapter.java: 1630)
    at flex.data.adapters.JavaAdapter.invoke(JavaAdapter.java:658)
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:318)
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:233)
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.doPush(DataPushSer viceImpl.java:142)
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.pushTo1Client(Data PushServiceImpl.java:178)
    at mypackage.servlet.InteractionServlet.push(InteractionServlet.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. doInvokeMethod(HandlerMethodInvoker.java:421)
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. invokeHandlerMethod(HandlerMethodInvoker.java:136)
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.invokeHandlerMethod(AnnotationMethodHandlerAdapter.java:326)
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.handle(AnnotationMethodHandlerAdapter.java:313)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(Dispatch erServlet.java:875)
    at org.springframework.web.servlet.DispatcherServlet.doService(Dispatche rServlet.java:807)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(Frame workServlet.java:571)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServl et.java:501)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(Appl icationFilterChain.java:290)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationF ilterChain.java:206)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperV alve.java:233)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextV alve.java:175)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.j ava:128)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.j ava:102)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineVal ve.java:109)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.jav a:263)
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java :844)
    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.proce ss(Http11Protocol.java:584)
    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:44 7)
    at java.lang.Thread.run(Unknown Source)
09:28:02,472 DEBUG [impl.DataPushServiceImpl] Message sent to '99' : false

==> what am I doing wrong?

I cannot trace the code (I do not have the source), but the exception thrown is just not helping at all.

Am I missing a header to set?

Thank you so much for your help,

1

There are 1 answers

0
Elnourso On BEST ANSWER

For the records, I found it ;o)

The base to know is that the DataServiceTransaction api is A MUST-HAVE, if you're using LC DataService.

For deails see Adobe forums thread

for the records here, here's my working (basic) code :

/**
* ASSUMPTION : the client Flex/Air apps set the desired userId (= filter) as a fillParameter of the
* DataService.fill() method. This will filter output based on {@link AbstractDto#getUserId()}.
*/

@Service
public final class DataPushServiceImpl implements DataPushService {
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);
/* *********** V2 : DataServiceTransaction.createItem() ********* */
/**
 * Does the push of a single DTO.
 *
 * @param dto
 *            {@link AbstractDto} object. Contains the {@link AbstractDto#getUserId()} that is used by clients to
 *            filter data in the DataService.fill() method (used by the Assembler).
 *
 * @throws DsPushException
 *             if any error
 */
private boolean doPushViaTransaction(final AbstractDto dto) throws DsPushException {

    if (LOG.isDebugEnabled()) {
        LOG.debug("Sending message through DataServiceTransaction (see userId field) : " + dto.toString());
    }

    // One MUST instantiate a DataServiceTransaction to be able to send anything (NullPointerException)
    DataServiceTransaction dtx = null;
    boolean isOwnerOfTx = false;
    boolean isSent = false;
    try {
        // if already in an Assembler, we do have a tx ==> no commit nor rollback!
        dtx = DataServiceTransaction.getCurrentDataServiceTransaction();
        if (dtx == null) {
            // new one, no JTA ==> ourselves : commit or rollback
            isOwnerOfTx = true;
            //MessageBroker instantiated with SpringFlex is auto-named
            dtx = DataServiceTransaction.begin("_messageBroker", false);
        }

        isSent = this.doTransactionSend(dto, dtx);

    } catch (Exception e) {
        // Log exception, but no impact on the back-end business ==> swallow Exception
        LOG.error("Could not send the creation to LCDS", e);
        if (isOwnerOfTx) {
            dtx.rollback();
        }
    } finally {
        try {
            if (isOwnerOfTx && (dtx != null)) {
                dtx.commit();
            }
        } catch (Exception e) {
            // swallow
            LOG.error("Could not send the creation to LCDS (@commit of the DataServiceTransaction)", e);
        }
    }

    return isSent;

}

private boolean doTransactionSend(final AbstractDto dto, final DataServiceTransaction dtx) {

    boolean isSent = false;

    if (dto == null) {
        LOG.error("The given DTO is null! Nothing happens");

    } else {
        try {
            dtx.createItem(FlexUtils.DESTINATION_NAME__POC_DS, dto);
            isSent = true; // no problem
        } catch (Exception e) {
            // Log exception, but no impact on the business
            LOG.error("Could not send the creation to LCDS for DTO " + dto.toString(), e);
        } finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug("DTO : " + dto.toString() + "\n sent : " + String.valueOf(isSent));
            }
        }
    }

    return isSent;
}

//implementation of DataPushService interface
/**
 * {@inheritDoc}
 *
 * @see DataPushService#pushNewDTo(AbstractDto, java.lang.Long)
 */
@Transactional(rollbackFor = DsPushException.class)
public boolean pushNewDTo(final AbstractDto dto, final Long subscriberId) throws DsPushException {
    return this.doPushViaTransaction(dto);
}
}

Enjoy and thank you for your time!

G.