Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams.sink.start() has no effect unless results are iterated #465

Open
ivangreene opened this issue May 5, 2021 · 7 comments
Open

streams.sink.start() has no effect unless results are iterated #465

ivangreene opened this issue May 5, 2021 · 7 comments
Labels
bug Something isn't working

Comments

@ivangreene
Copy link
Contributor

ivangreene commented May 5, 2021

Expected Behavior (Mandatory)

Calling streams.sink.start() should start the sink, regardless of whether the results are iterated

Actual Behavior (Mandatory)

The sink is not started unless the results are iterated

How to Reproduce the Problem

Steps (Mandatory)

  1. Call streams.sink.start from within another procedure, using GraphDatabaseService#executeTransactionally:
graphDatabaseService.executeTransactionally("CALL streams.sink.start();");
  1. Observe that the sink is not started by checking the logs
  2. This code will actually work to start it:
graphDatabaseService.executeTransactionally("CALL streams.sink.start();", Map.of(), r -> r.stream().collect(Collectors.toList()));

This appears to be due to the procedure returning a Stream where the start call only occurs inside of a stream stage, which gets skipped if the stream is not consumed: https://github.com/neo4j-contrib/neo4j-streams/blob/3.5/consumer/src/main/kotlin/streams/procedures/StreamsSinkProcedures.kt#L50-L62
I don't really know Kotlin but I was able to decompile it to Java to see this:

   @Procedure("streams.sink.start")
   @NotNull
   public final Stream sinkStart() {
      this.checkEnabled();
      return this.checkLeader((Function0)(new Function0() {
         // $FF: synthetic method
         // $FF: bridge method
         public Object invoke() {
            return this.invoke();
         }

         @NotNull
         public final Stream invoke() {
            Stream var1;
            try {
               StreamsSinkProcedures.this.getStreamsEventSink().start();
               var1 = StreamsSinkProcedures.this.sinkStatus();
            } catch (Exception var3) {
               Log var10000 = StreamsSinkProcedures.this.log;
               if (var10000 != null) {
                  var10000.error("Cannot start the Sink because of the following exception", (Throwable)var3);
               }

               Stream var4 = Stream.concat(StreamsSinkProcedures.this.sinkStatus(), Stream.of(new KeyValueResult("exception", ExceptionUtils.getStackTrace((Throwable)var3))));
               Intrinsics.checkNotNullExpressionValue(var4, "Stream.concat(sinkStatus…Utils.getStackTrace(e))))");
               var1 = var4;
            }

            return var1;
         }
      }));
   }

Specifications (Mandatory)

Currently used versions

Versions

  • OS: macOS
  • Neo4j: 4.1.5
  • Neo4j-Streams: 4.0.6
@ivangreene
Copy link
Contributor Author

ivangreene commented May 5, 2021

It is worth noting that GraphDatabaseService.executeTransactionally(String) does indeed auto-close the Result, but that seems to not be enough to cause the Stream to be consumed. This makes sense in the case of Streams that may be much larger
https://github.com/neo4j/neo4j/blob/4.2/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/GraphDatabaseFacade.java#L164-L168

@moxious
Copy link
Contributor

moxious commented May 14, 2021

@ivangreene we are investigating this. We think this may be a limitation in the semantics/operation of executeTransactionally, but are not yet sure. Will follow up.

@moxious
Copy link
Contributor

moxious commented May 14, 2021

Underlying implementation is here:

Kernel team has indicated that consumption or non-consumption of the results should not affect whether the TX executes or not.

@ivangreene
Copy link
Contributor Author

@moxious It makes sense that executeTransactionally should not consume an entire stream if all of the results are not needed, although that behavior may have surprises like this. I wish I understood Kotlin so I could open a PR, but I don't know how to move the call to 'start' outside of the stream operation 😄

@moxious
Copy link
Contributor

moxious commented May 14, 2021

@ivangreene starting the sink indeed does have different threading consequences, which is a conflating factor here, but you can't really move that call outside of the path. Otherwise if starting the consumption service failed (for whatever reason) it would be impossible to be notified of that in the return of the proc. Moving it outside would make the proc blind to the success or failure of the call, and would make the status returned not meaningful

@moxious
Copy link
Contributor

moxious commented May 14, 2021

This will likely require us to repro and do some investigation. Good enough for the moment that the work-around is so trivial (just consume the results)

@moxious moxious added the bug Something isn't working label May 14, 2021
@ivangreene
Copy link
Contributor Author

Hmm... so you need some way to call start() immediately, but also return an error in the result Stream if the call to start() fails. Yep I have no idea how to do this in Kotlin 😄 . Ok well thanks for the attention to the issue, and let me know if you have any problems reproducing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants