52
52
import org .springframework .cloud .function .context .FunctionProperties .FunctionConfigurationProperties ;
53
53
import org .springframework .cloud .function .context .FunctionRegistration ;
54
54
import org .springframework .cloud .function .context .FunctionRegistry ;
55
+ import org .springframework .cloud .function .context .PostProcessingFunction ;
55
56
import org .springframework .cloud .function .context .config .RoutingFunction ;
56
57
import org .springframework .cloud .function .core .FunctionInvocationHelper ;
57
58
import org .springframework .cloud .function .json .JsonMapper ;
@@ -414,6 +415,10 @@ public class FunctionInvocationWrapper implements Function<Object, Object>, Cons
414
415
415
416
private boolean wrapped ;
416
417
418
+ private final ThreadLocal <Message <Object >> unconvertedResult = new ThreadLocal <>();
419
+
420
+ private PostProcessingFunction postProcessor ;
421
+
417
422
/*
418
423
* This is primarily to support Stream's ability to access
419
424
* un-converted payload (e.g., to evaluate expression on some attribute of a payload)
@@ -425,6 +430,9 @@ public class FunctionInvocationWrapper implements Function<Object, Object>, Cons
425
430
private boolean wrappedBiConsumer ;
426
431
427
432
FunctionInvocationWrapper (String functionDefinition , Object target , Type inputType , Type outputType ) {
433
+ if (target instanceof PostProcessingFunction ) {
434
+ this .postProcessor = (PostProcessingFunction ) target ;
435
+ }
428
436
this .target = target ;
429
437
this .inputType = this .normalizeType (inputType );
430
438
this .outputType = this .normalizeType (outputType );
@@ -441,6 +449,25 @@ public class FunctionInvocationWrapper implements Function<Object, Object>, Cons
441
449
}
442
450
}
443
451
452
+ @ SuppressWarnings ("unchecked" )
453
+ public void postProcess () {
454
+ if (this .postProcessor != null ) {
455
+ Message result = this .unconvertedResult .get ();
456
+ if (result != null ) {
457
+ try {
458
+ this .postProcessor .postProcess (result );
459
+ }
460
+ catch (Exception ex ) {
461
+ logger .warn ("Failed to post process function "
462
+ + this .functionDefinition + "; Result of the invocation before post processing is " + result , ex );
463
+ }
464
+ finally {
465
+ this .unconvertedResult .remove ();
466
+ }
467
+ }
468
+ }
469
+ }
470
+
444
471
public boolean isWrappedBiConsumer () {
445
472
return wrappedBiConsumer ;
446
473
}
@@ -652,6 +679,9 @@ else if (this.outputType == null) {
652
679
String composedName = this .functionDefinition + "|" + afterWrapper .functionDefinition ;
653
680
FunctionInvocationWrapper composedFunction = invocationWrapperInstance (composedName , rawComposedFunction , composedFunctionType );
654
681
composedFunction .composed = true ;
682
+ if (((FunctionInvocationWrapper ) after ).target instanceof PostProcessingFunction ) {
683
+ composedFunction .postProcessor = (PostProcessingFunction ) ((FunctionInvocationWrapper ) after ).target ;
684
+ }
655
685
656
686
return (Function <Object , V >) composedFunction ;
657
687
}
@@ -704,6 +734,10 @@ else if (this.isConsumer()) {
704
734
result = this .invokeFunction (convertedInput );
705
735
}
706
736
737
+ if (this .postProcessor != null ) {
738
+ this .unconvertedResult .set ((Message <Object >) result );
739
+ }
740
+
707
741
if (result != null && this .outputType != null ) {
708
742
result = this .convertOutputIfNecessary (result , this .outputType , this .expectedOutputContentType );
709
743
}
0 commit comments