Skip to content

Decouple TestkitState from CommandProcessor. #993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private void handleClient( Socket clientSocket )
System.out.println( "Handling connection from: " + clientSocket.getRemoteSocketAddress() );
BufferedReader in = new BufferedReader( new InputStreamReader( clientSocket.getInputStream() ) );
BufferedWriter out = new BufferedWriter( new OutputStreamWriter( clientSocket.getOutputStream() ) );
CommandProcessor commandProcessor = new CommandProcessor( in, out );
CommandProcessor commandProcessor = new DefaultCommandProcessor( in, out );

boolean cont = true;
while ( cont )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,227 +18,42 @@
*/
package neo4j.org.testkit.backend;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import neo4j.org.testkit.backend.messages.TestkitModule;
import neo4j.org.testkit.backend.messages.requests.TestkitRequest;
import neo4j.org.testkit.backend.messages.responses.BackendError;
import neo4j.org.testkit.backend.messages.responses.DriverError;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;

import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.UntrustedServerException;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;

public class CommandProcessor
public interface CommandProcessor
{
private final TestkitState testkitState;

private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Used in ObjectMapper's injectable values.
*/
String COMMAND_PROCESSOR_ID = "commandProcessor";

/**
* Reads one request and writes the response. Returns false when not able to read anymore.
*
* @return False when there's nothing to read anymore.
*/
boolean process();

/**
* Create a new {@link ObjectMapper} configured with the appropriate testkit module and an injectable {@link CommandProcessor}.
* @param processor The processor supposed to be injectable
* @return A reusable object mapper instance
*/
static ObjectMapper newObjectMapperFor(CommandProcessor processor) {

final ObjectMapper objectMapper = new ObjectMapper();

private final BufferedReader in;
private final BufferedWriter out;

public CommandProcessor( BufferedReader in, BufferedWriter out )
{
this.in = in;
this.out = out;
configureObjectMapper();
this.testkitState = new TestkitState( this::writeResponse, this::process );
}

private void configureObjectMapper()
{
TestkitModule testkitModule = new TestkitModule();
this.objectMapper.registerModule( testkitModule );
this.objectMapper.disable( DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES );
}

private String readLine()
{
try
{
return this.in.readLine();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

private void write( String s )
{
try
{
this.out.write( s );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

// Logs to frontend
private void log( String s )
{
try
{
this.out.write( s + "\n" );
this.out.flush();
}
catch ( IOException e )
{
}
System.out.println( s );
}

private void flush()
{
try
{
this.out.flush();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

// Reads one request and writes the response. Returns false when not able to read anymore.
public boolean process()
{
boolean inRequest = false;
StringBuilder request = new StringBuilder();

log( "Waiting for request" );

while ( true )
{
String currentLine = readLine();
// End of stream
if ( currentLine == null )
{
return false;
}

if ( currentLine.equals( "#request begin" ) )
{
inRequest = true;
}
else if ( currentLine.equals( "#request end" ) )
{
if ( !inRequest )
{
throw new RuntimeException( "Request end not expected" );
}
try
{
processRequest( request.toString() );
}
catch ( Exception e )
{
if ( e instanceof Neo4jException )
{
// Error to track
String id = testkitState.newId();
testkitState.getErrors().put( id, (Neo4jException) e );
writeResponse( driverError( id, (Neo4jException) e ) );
System.out.println( "Neo4jException: " + e );
}
else if ( isConnectionPoolClosedException( e ) || e instanceof UntrustedServerException )
{
String id = testkitState.newId();
DriverError driverError = DriverError.builder()
.data(
DriverError.DriverErrorBody.builder()
.id( id )
.errorType( e.getClass().getName() )
.msg( e.getMessage() )
.build()
)
.build();
writeResponse( driverError );
}
else
{
// Unknown error, interpret this as a backend error.
// Report to frontend and rethrow, note that if socket been
// closed the writing will throw itself...
writeResponse( BackendError.builder().data( BackendError.BackendErrorBody.builder().msg( e.toString() ).build() ).build() );
// This won't print if there was an IO exception since line above will rethrow
e.printStackTrace();
throw e;
}
}
return true;
}
else
{
if ( !inRequest )
{
throw new RuntimeException( "Command Received whilst not in request" );
}
request.append( currentLine );
}
}
}

private DriverError driverError( String id, Neo4jException e )
{
return DriverError.builder().data(
DriverError.DriverErrorBody.builder()
.id( id )
.errorType( e.getClass().getName() )
.code( e.code() )
.msg( e.getMessage() )
.build() )
.build();
}

public void processRequest( String request )
{
System.out.println( "request = " + request + ", in = " + in + ", out = " + out );
try
{
TestkitRequest testkitMessage = objectMapper.readValue( request, TestkitRequest.class );
TestkitResponse response = testkitMessage.process( testkitState );
if ( response != null )
{
writeResponse( response );
}
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

private void writeResponse( TestkitResponse response )
{
try
{
String responseStr = objectMapper.writeValueAsString( response );
System.out.println("response = " + responseStr + ", in = " + in + ", out = " + out);
write( "#response begin\n" );
write( responseStr + "\n" );
write( "#response end\n" );
flush();
}
catch ( JsonProcessingException ex )
{
throw new RuntimeException( "Error writing response", ex );
}
}
objectMapper.registerModule( testkitModule );
objectMapper.disable( DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES );

private boolean isConnectionPoolClosedException( Exception e )
{
return e instanceof IllegalStateException && e.getMessage() != null &&
e.getMessage().equals( ConnectionPoolImpl.CONNECTION_POOL_CLOSED_ERROR_MESSAGE );
InjectableValues injectableValues = new InjectableValues.Std( Collections.singletonMap( COMMAND_PROCESSOR_ID, processor ) );
objectMapper.setInjectableValues( injectableValues );
return objectMapper;
}
}
Loading