You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Offsets shall be committed even if all records in a poll are skipped by interceptors.
Current Behavior
Offsets are not committed when all records in a poll are skipped by interceptors, causing lags.
Context
In our project, we skip huge amounts of records (company-wide topics where only a fraction is important to us).
Therefore, polls often contain records that are all skipped by interceptors.
In such cases, partition offsets are not committed and we observe enormous lags (hundreds of thousands).
This behaviour effectively prevents us from detecting slow consumers.
The only workaround is to avoid interceptors and implement a custom CompositeRecordFilterStrategy that does the same thing.
However, we need to have our custom LoggingInterceptor in place (begins a new XRay Segment on intercept and ends it on afterRecord) that logs only non-skipped records.
Since the interceptors are called prior to the filters, we're forced into an ugly workaround where the LoggingInterceptor will have to call the CompositeRecordFilterStrategy to verify if a message will be filtered out and log those non-filtered.
Expected Behavior
Offsets shall be committed even if all records in a poll are skipped by interceptors.
Current Behavior
Offsets are not committed when all records in a poll are skipped by interceptors, causing lags.
Context
In our project, we skip huge amounts of records (company-wide topics where only a fraction is important to us).
Therefore, polls often contain records that are all skipped by interceptors.
In such cases, partition offsets are not committed and we observe enormous lags (hundreds of thousands).
This behaviour effectively prevents us from detecting slow consumers.
The only workaround is to avoid interceptors and implement a custom
CompositeRecordFilterStrategy
that does the same thing.However, we need to have our custom
LoggingInterceptor
in place (begins a new XRay Segment onintercept
and ends it onafterRecord
) that logs only non-skipped records.Since the interceptors are called prior to the filters, we're forced into an ugly workaround where the
LoggingInterceptor
will have to call theCompositeRecordFilterStrategy
to verify if a message will be filtered out and log those non-filtered.This issue is created after a discussion with @garyrussell on StackOverflow.
The text was updated successfully, but these errors were encountered: