storm java.io.NotSerializableException

估计有2年没搞storm了,今天由于工作需要重新搞一次,

不过需要使用最新版本的storm了。使用的是1.2.3版本,

写了程序,报了一个错误,如下

13886 [main] INFO  o.a.s.d.s.Supervisor - Starting supervisor with id 87f0e450-46bf-4545-b86f-2a9f961ad24d at host vm1.
Exception in thread "main" java.lang.IllegalStateException: Spout 'ThingSpout' contains a non-serializable field of type rexel.topo.ThingSpout$$Lambda$1/109961541, which was instantiated prior to topology creation. rexel.topo.ThingSpout$$Lambda$1/109961541 should be instantiated within the prepare method of 'ThingSpout at the earliest.
	at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:143)
	at rexel.topo.MainTopology.main(MainTopology.java:34)
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: rexel.topo.ThingSpout$$Lambda$1/109961541
	at org.apache.storm.utils.Utils.javaSerialize(Utils.java:240)
	at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:138)
	... 1 more
Caused by: java.io.NotSerializableException: rexel.topo.ThingSpout$$Lambda$1/109961541
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.storm.utils.Utils.javaSerialize(Utils.java:236)
	... 2 more

工程结构如下:

MainTopology代码如下:

package rexel.topo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import rexel.bean.PropertiesBean;
import rexel.utils.PropertiesUtils;

public class MainTopology {
    private static PropertiesUtils propertiesUtils = PropertiesUtils.getInstance();

    public static void main(String[] args) throws Exception {
        PropertiesBean propertiesBean = propertiesUtils.readProperties();
        if (propertiesBean == null) {
            return;
        }

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("ThingSpout", new ThingSpout(), 1);
        builder.setBolt("ThingBolt", new ThingBolt(), 1).shuffleGrouping("ThingSpout");

        Config config = new Config();
        config.setNumWorkers(1);
        config.setMessageTimeoutSecs(60);
        config.setMaxSpoutPending(100);
        config.setNumAckers(1);if (Boolean.valueOf(args[0])) {
            StormSubmitter.submitTopology("MainTopology", config, builder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("MainTopology", config, builder.createTopology());
        }
    }
}

ThingSpout代码如下:

package rexel.topo;

import java.net.URI;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import rexel.bean.PropertiesBean;
import rexel.utils.CommonUtils;
import rexel.utils.PropertiesUtils;

public class ThingSpout extends BaseRichSpout {
    private static PropertiesUtils propertiesUtils = PropertiesUtils.getInstance();
    private static final long serialVersionUID = 1L;
    private static LinkedBlockingQueue<byte[]> queue;
    private static SpoutOutputCollector collector = null;

    @Override
    public void open(Map map, TopologyContext topologyContext,
        SpoutOutputCollector spoutOutputCollector) {
        PropertiesBean propertiesBean = propertiesUtils.readProperties();
        if (propertiesBean == null) {
            return;
        }

        collector = spoutOutputCollector;
        queue = new LinkedBlockingQueue<>(1000);

        //参数说明,请参见:AMQP客户端接入说明。
        String accessKey = propertiesBean.getAccessKey();
        String accessSecret = propertiesBean.getAccessSecret();
        String uid = propertiesBean.getUid();
        String regionId = propertiesBean.getRegionId();

        String consumerGroupId = "AsgdwvkMT3ygC2IwT9GD000100";
        long timeStamp = System.currentTimeMillis();

        //签名方法:支持hmacmd5,hmacsha1和hmacsha256
        String signMethod = "hmacsha1";
        //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
        //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
        String clientId = CommonUtils.getDeviceUnique();

        //UserName组装方法,请参见文档:AMQP客户端接入说明。
        String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",consumerGroupId=" + consumerGroupId
            + "|";
        //password组装方法,请参见文档:AMQP客户端接入说明。
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        //按照qpid-jms的规范,组装连接URL。
        String connectionUrl = "failover:(amqps://" + uid + ".iot-amqp." + regionId + ".aliyuncs.com:5671?amqp.idleTimeout=80000)"
            + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");

        try {
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // Create Connection
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // Create Session
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()
            // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            // Create Receiver Link
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void nextTuple() {
        try {
            byte[] body = queue.take();
            String uuid = UUID.randomUUID().toString();
            collector.emit(new Values((Object) body), uuid);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("body"));
    }

    private MessageListener messageListener = message -> {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            System.out.println("receive message"
                + ", topic = " + topic + ", msgId = " + messageId + ", content = " + content);
            queue.put(body);
        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    private JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * 连接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            System.out.println("onConnectionEstablished, remoteUri:{}" + remoteURI + "}");
        }

        /**
         * 尝试过最大重试次数之后,最终连接失败。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            System.out.println("onConnectionFailure, {" + error.getMessage() + "}");
        }

        /**
         * 连接中断。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            System.out.println("onConnectionInterrupted, remoteUri:{" + remoteURI + "}");
        }

        /**
         * 连接中断后又自动重连上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            System.out.println("onConnectionRestored, remoteUri:{" + remoteURI + "}");
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    private String doSign(String toSignString, String secret, String signMethod) {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        byte[] rawHmac = null;
        try {
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            rawHmac = mac.doFinal(toSignString.getBytes());
        } catch (NoSuchAlgorithmException | InvalidKeyException e) {
            e.printStackTrace();
        }
        return Base64.encodeBase64String(rawHmac);
    }
}

ThingBolt代码如下:

package rexel.topo;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ThingBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

    }

    @Override
    public void execute(Tuple tuple) {
        String content = new String(tuple.getBinaryByField("body"));
        System.out.println("receive message: {" + content + "}");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

错误的原因是在Spout中有没有序列化的代码,将以下两个对象前面加上static之后,问题解决。

private MessageListener messageListener → private static MessageListener messageListener

private JmsConnectionListener myJmsConnectionListener → private static JmsConnectionListener myJmsConnectionListener

原文地址:https://www.cnblogs.com/quchunhui/p/12271533.html