Skip to main content

Definition

Assembly: CloudNimble.SimpleMessageBus.Dispatch.Kafka.dll Namespace: CloudNimble.SimpleMessageBus.Dispatch.Kafka Inheritance: System.Object

Syntax

CloudNimble.SimpleMessageBus.Dispatch.Kafka.KafkaProcessor

Summary

Processes messages from Apache Kafka and dispatches them to registered message handlers.

Remarks

This processor integrates with Azure WebJobs to automatically trigger message processing when messages arrive in a Kafka topic. It handles message deserialization, lifecycle management, and provides proper logging and dependency injection scope for each message. Unlike Azure Storage Queues, Kafka does not require explicit message deletion. The WebJobs Kafka extension automatically commits offsets after successful processing. If processing fails, the message will be reprocessed based on the consumer group’s offset configuration.

Examples

Host.CreateDefaultBuilder()
    .ConfigureServices(services =>
    {
        services.AddSingleton<IMessageHandler, MyMessageHandler>();
    })
    .UseKafkaProcessor(options =>
    {
        options.BrokerList = "localhost:9092";
        options.ConsumerGroup = "my-consumer";
    })
    .UseOrderedMessageDispatcher()
    .Build()
    .Run();

Constructors

.ctor

Creates a new instance of KafkaProcessor.

Syntax

public KafkaProcessor(CloudNimble.SimpleMessageBus.Dispatch.IMessageDispatcher dispatcher, Microsoft.Extensions.DependencyInjection.IServiceScopeFactory serviceScopeFactory)

Parameters

NameTypeDescription
dispatcherCloudNimble.SimpleMessageBus.Dispatch.IMessageDispatcherThe message dispatcher to route messages to handlers.
serviceScopeFactoryMicrosoft.Extensions.DependencyInjection.IServiceScopeFactoryFactory for creating DI scopes per message.

Exceptions

ExceptionDescription
ArgumentNullExceptionThrown when dispatcher or serviceScopeFactory is null.

.ctor Inherited

Inherited from object

Syntax

public Object()

Methods

Equals Inherited Virtual

Inherited from object

Syntax

public virtual bool Equals(object obj)

Parameters

NameTypeDescription
objobject?-

Returns

Type: bool

Equals Inherited

Inherited from object

Syntax

public static bool Equals(object objA, object objB)

Parameters

NameTypeDescription
objAobject?-
objBobject?-

Returns

Type: bool

GetHashCode Inherited Virtual

Inherited from object

Syntax

public virtual int GetHashCode()

Returns

Type: int

GetType Inherited

Inherited from object

Syntax

public System.Type GetType()

Returns

Type: System.Type

MemberwiseClone Inherited

Inherited from object

Syntax

protected internal object MemberwiseClone()

Returns

Type: object

ProcessKafkaMessage

Processes a message from the Kafka topic and dispatches it to registered handlers.

Syntax

public System.Threading.Tasks.Task ProcessKafkaMessage(Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaEventData<string> kafkaEvent, Microsoft.Extensions.Logging.ILogger logger)

Parameters

NameTypeDescription
kafkaEventMicrosoft.Azure.WebJobs.Extensions.Kafka.KafkaEventData<string>The Kafka event containing the message payload.
loggerMicrosoft.Extensions.Logging.ILoggerThe logger instance for this processing operation.

Returns

Type: System.Threading.Tasks.Task A Task representing the asynchronous operation.

Remarks

This method is triggered automatically by the WebJobs Kafka extension when messages arrive. It deserializes the message envelope, sets up processing context, and dispatches to handlers. The offset is automatically committed after successful processing. If an exception is thrown, the offset is not committed and the message will be reprocessed.

ReferenceEquals Inherited

Inherited from object

Syntax

public static bool ReferenceEquals(object objA, object objB)

Parameters

NameTypeDescription
objAobject?-
objBobject?-

Returns

Type: bool

ToString Inherited Virtual

Inherited from object

Syntax

public virtual string ToString()

Returns

Type: string?
  • CloudNimble.SimpleMessageBus.Dispatch.IQueueProcessor