weixin_39640417
weixin_39640417
2021-01-12 13:50

Storm and Titan Interop Problems

Hello, I'm writing a Storm bolt that will insert data into a Titan graph backed by Cassandra (by DataStax, version ReleaseVersion: 1.2.5).

Titan Version: 0.3.1 (depends on Kryo version: 2.21) Storm Version: 0.8.2 (depends on Kryo version: 2.17)

I think I have a Catch-22. I can either tell the build tool to use Titan's Kryo dependency or Storm's Kryo dependency. No matter what I do, I see a problem.

Use Titan's Kryo Dependency

First, when I specify that Titan's version of Kryo should be used (version 2.21), I get the following stack trace when I run my Storm topology:


3657 [Thread-7] ERROR backtype.storm.daemon.worker  - Error on initialization of server mk-worker
java.lang.NoSuchMethodError: backtype.storm.serialization.DefaultKryoFactory$KryoSerializableDefault.setReferences(Z)V
    at backtype.storm.serialization.DefaultKryoFactory.getKryo(DefaultKryoFactory.java:32)
    at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:32)
    at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15)
    at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22)
    at backtype.storm.daemon.executor$executor_data$fn__3909.invoke(executor.clj:195)
    at backtype.storm.util$assoc_apply_self.invoke(util.clj:731)
    at backtype.storm.daemon.executor$executor_data.invoke(executor.clj:195)
    at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:263)
    at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349$iter__4354__4358$fn__4359.invoke(worker.clj:354)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$dorun.invoke(core.clj:2725)
    at clojure.core$doall.invoke(core.clj:2741)
    at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349.invoke(worker.clj:354)
    at clojure.lang.AFn.applyToHelper(AFn.java:185)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:601)
    at backtype.storm.daemon.worker$fn__4348$mk_worker__4404.doInvoke(worker.clj:323)
    at clojure.lang.RestFn.invoke(RestFn.java:512)
    at backtype.storm.daemon.supervisor$fn__4807.invoke(supervisor.clj:467)
    at clojure.lang.MultiFn.invoke(MultiFn.java:177)
    at backtype.storm.daemon.supervisor$sync_processes$iter__4684__4688$fn__4689.invoke(supervisor.clj:249)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$dorun.invoke(core.clj:2725)
    at clojure.core$doall.invoke(core.clj:2741)
    at backtype.storm.daemon.supervisor$sync_processes.invoke(supervisor.clj:237)
    at clojure.lang.AFn.applyToHelper(AFn.java:161)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:603)
    at clojure.core$partial$fn__4070.doInvoke(core.clj:2343)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at backtype.storm.event$event_manager$fn__2507.invoke(event.clj:24)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:722)
</init></init>

Storm's DefaultKryoFactory defines a nested class named KryoSerializableDefault which extends Kryo. DefaultKryoFactory calls Kryo's setReferences method.

Kryo's setReferences method currently has the following signature:


public boolean setReferences (boolean references)

On August 17, 2012 nathan.sweet checked in a change that changed the return type of setReferences to boolean.

You'll note that the stack trace above shows the missing method to be:


KryoSerializableDefault.setReferences(Z)V

where the V stands for void and Z stands for boolean (see here for more details on type codes).

Use Storm's Kryo Dependency

When I use Storm's Kryo dependency (version 2.17), drop the titan keyspace in cassandra-cli, and then run my Storm topology, every thing seems to work fine at first but then when I want to view my database with gremlin, I start having problems:

I start up gremlin and enter the following:


 c = new BaseConfiguration() 
 c.setProperty("storage.backend", "cassandra")
 c.setProperty("storage.hostname", "127.0.0.1")
 g = TitanFactory.open(c)

So far, everything works great. So I want to see how many vertices I have:


g.V.count()

And I get this:


13/06/13 11:51:00 INFO thrift.ThriftKeyspaceImpl: Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace titan
java.lang.IllegalArgumentException: Can not set com.thinkaurelius.titan.core.TypeGroup field com.thinkaurelius.titan.graphdb.types.AbstractTypeDefinition.group to com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition
Serialization trace:
group (com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition)

I hit Y to see the stack trace and I get this:


com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set com.thinkaurelius.titan.core.TypeGroup field com.thinkaurelius.titan.graphdb.types.AbstractTypeDefinition.group to com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition
Serialization trace:
group (com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
    at com.thinkaurelius.titan.graphdb.database.serialize.kryo.KryoSerializer.readObjectNotNull(KryoSerializer.java:109)
    at com.thinkaurelius.titan.graphdb.database.EdgeSerializer.parseProperties(EdgeSerializer.java:176)
    at com.thinkaurelius.titan.graphdb.database.EdgeSerializer.getProperties(EdgeSerializer.java:120)
    at com.thinkaurelius.titan.graphdb.database.EdgeSerializer.readRelation(EdgeSerializer.java:65)
    at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$5$3.apply(StandardTitanTx.java:599)
    at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$5$3.apply(StandardTitanTx.java:595)
    at com.google.common.collect.Iterators$9.transform(Iterators.java:893)
    at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
    at com.thinkaurelius.titan.graphdb.query.QueryProcessor$OuterIterator.nextInternal(QueryProcessor.java:148)
    at com.thinkaurelius.titan.graphdb.query.QueryProcessor$OuterIterator.<init>(QueryProcessor.java:137)
    at com.thinkaurelius.titan.graphdb.query.QueryProcessor.iterator(QueryProcessor.java:48)
    at com.google.common.collect.Iterables$7.iterator(Iterables.java:611)
    at com.google.common.collect.Iterables.getOnlyElement(Iterables.java:282)
    at com.thinkaurelius.titan.graphdb.query.QueryUtil.queryHiddenUniqueProperty(QueryUtil.java:16)
    at com.thinkaurelius.titan.graphdb.types.vertices.TitanKeyVertex.getDefinition(TitanKeyVertex.java:24)
    at com.thinkaurelius.titan.graphdb.types.vertices.TitanKeyVertex.getDefinition(TitanKeyVertex.java:11)
    at com.thinkaurelius.titan.graphdb.types.vertices.TitanTypeVertex.getName(TitanTypeVertex.java:20)
    at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$3.get(StandardTitanTx.java:241)
    at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$3.get(StandardTitanTx.java:225)
    at com.thinkaurelius.titan.graphdb.transaction.vertexcache.SimpleVertexCache.get(SimpleVertexCache.java:32)
    at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx.getExistingVertex(StandardTitanTx.java:221)
    at com.thinkaurelius.titan.graphdb.transaction.VertexIterable$1.nextVertex(VertexIterable.java:39)
    at com.thinkaurelius.titan.graphdb.transaction.VertexIterable$1.next(VertexIterable.java:58)
    at com.thinkaurelius.titan.graphdb.transaction.VertexIterable$1.next(VertexIterable.java:29)
    at com.tinkerpop.pipes.util.iterators.HistoryIterator.next(HistoryIterator.java:25)
    at com.tinkerpop.pipes.IdentityPipe.processNextStart(IdentityPipe.java:19)
    at com.tinkerpop.pipes.AbstractPipe.next(AbstractPipe.java:89)
    at com.tinkerpop.pipes.util.Pipeline.next(Pipeline.java:115)
    at com.tinkerpop.pipes.util.PipeHelper.counter(PipeHelper.java:108)
    at com.tinkerpop.gremlin.java.GremlinPipeline.count(GremlinPipeline.java:1397)
    at com.tinkerpop.pipes.util.PipesFluentPipeline$count.call(Unknown Source)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112)
    at groovysh_evaluate.run(groovysh_evaluate:56)
    at groovysh_evaluate$run.call(Unknown Source)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
    at groovysh_evaluate$run.call(Unknown Source)
    at org.codehaus.groovy.tools.shell.Interpreter.evaluate(Interpreter.groovy:67)
    at org.codehaus.groovy.tools.shell.Interpreter$evaluate.call(Unknown Source)
    at org.codehaus.groovy.tools.shell.Groovysh.execute(Groovysh.groovy:152)
    at org.codehaus.groovy.tools.shell.Shell.leftShift(Shell.groovy:114)
    at org.codehaus.groovy.tools.shell.Shell$leftShift$0.call(Unknown Source)
    at org.codehaus.groovy.tools.shell.ShellRunner.work(ShellRunner.groovy:88)
    at org.codehaus.groovy.tools.shell.InteractiveShellRunner.super$2$work(InteractiveShellRunner.groovy)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90)
    at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:233)
    at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1079)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:128)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:148)
    at org.codehaus.groovy.tools.shell.InteractiveShellRunner.work(InteractiveShellRunner.groovy:100)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite$PogoCachedMethodSiteNoUnwrapNoCoerce.invoke(PogoMetaMethodSite.java:272)
    at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite.callCurrent(PogoMetaMethodSite.java:52)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:137)
    at org.codehaus.groovy.tools.shell.ShellRunner.run(ShellRunner.groovy:57)
    at org.codehaus.groovy.tools.shell.InteractiveShellRunner.super$2$run(InteractiveShellRunner.groovy)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90)
    at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:233)
    at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1079)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:128)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:148)
    at org.codehaus.groovy.tools.shell.InteractiveShellRunner.run(InteractiveShellRunner.groovy:66)
    at com.thinkaurelius.titan.tinkerpop.gremlin.Console.<init>(Console.java:65)
    at com.thinkaurelius.titan.tinkerpop.gremlin.Console.<init>(Console.java:78)
    at com.thinkaurelius.titan.tinkerpop.gremlin.Console.main(Console.java:104)
Caused by: java.lang.IllegalArgumentException: Can not set com.thinkaurelius.titan.core.TypeGroup field com.thinkaurelius.titan.graphdb.types.AbstractTypeDefinition.group to com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition
    at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:164)
    at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:168)
    at sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81)
    at java.lang.reflect.Field.set(Field.java:680)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:619)
    ... 79 more
</init></init></init>

This is where I get confused. If we're just using one version of Kryo, then you'd think that we would be able to read anything we had written (unless there was a bug in Kryo).

Do you know of any workarounds, or fixes I could employ here?

Thanks!

Tim Stewart

该提问来源于开源项目:thinkaurelius/titan

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

9条回答

  • weixin_39621185 weixin_39621185 4月前

    Your analysis of both Kyro version configurations looks sound and complete to me. Is there anything else we can do on this issue?

    点赞 评论 复制链接分享
  • weixin_39640417 weixin_39640417 4月前

    Thanks Dan! I'm not a seasoned Java developer. Is overwriting the JAR file an acceptable solution for me to use? For all I know Titan depends on something released in version 2.21 of Kryo (e.g. a performance improvement in Kryo).

    Was there a binary compatibility issue related to serialization introduced by Kryo between version 2.17 and 2.21? It might be nice to know how important binary compatibility is to Kryo because that could make staying current a pain for the fantastic folks developing Titan and potentially for Titan's users.

    Thanks again!

    点赞 评论 复制链接分享
  • weixin_39753747 weixin_39753747 4月前

    Thanks for explaining everything. I had the same issue and I thought I forgot to implement Serializable in one of my classes in Storm but no.

    Rolling back to Kryo 2.17 in both sides solved the problem.

    点赞 评论 复制链接分享
  • weixin_39720865 weixin_39720865 4月前

    , thanks for this analysis. We just ran into the same problem. Good work.

    点赞 评论 复制链接分享
  • weixin_39899776 weixin_39899776 4月前

    Looks like this is resolved with a nice discussion and can be closed. Recommend closing.

    点赞 评论 复制链接分享
  • weixin_39983383 weixin_39983383 4月前

    was there anything more to do here? i kinda lost track.

    点赞 评论 复制链接分享
  • weixin_39983383 weixin_39983383 4月前

    Closing - pending specific feedback that we need to keep this open for something.

    点赞 评论 复制链接分享
  • weixin_39640417 weixin_39640417 4月前

    If we're just using one version of Kryo, then you'd think that we would be able to read anything we had written (unless there was a bug in Kryo).

    Ah! I'm not using one version of Kryo. When I run gremlin.sh, it's using the 2.21 version of Kryo. I'll try dropping in the 2.17 version of Kryo into the titan install. This implies that Kryo might have introduced a binary incompatibility between 2.17 and 2.21.

    点赞 评论 复制链接分享
  • weixin_39640417 weixin_39640417 4月前

    That fixed the problem.

    点赞 评论 复制链接分享

相关推荐