Your First Worker With .Net
This guide walks you through creating a custom .Net Milvaion worker from scratch. By the end, you'll have a working worker with a custom job that you can deploy.
Prerequisites
- .NET 10 SDK installed
- Milvaion stack running (see Quick Start)
- Basic C# knowledge
Step 1: Install the Worker Template
Milvaion provides project templates for quick setup:
dotnet new install Milvasoft.Templates.Milvaion
Verify installation:
dotnet new list milvaion
You should see:
Template Name Short Name Language Tags
----------------------- ----------------------- -------- -----------------------
Milvaion Api Worker milvaion-api-worker [C#] Api/Worker/Milvaion
Milvaion Console Worker milvaion-console-worker [C#] Console/Worker/Milvaion
Step 2: Create a New Worker Project
dotnet new milvaion-console-worker -n MyCompany.BillingWorker
cd MyCompany.BillingWorker
This creates:
MyCompany.BillingWorker/
├── Program.cs # Entry point
├── appsettings.json # Configuration
├── appsettings.Development.json # Dev config
|
├── Jobs/
| └── SampleJob.cs # Example job
|
├── Dockerfile # Container build
└── MyCompany.BillingWorker.csproj
Step 3: Configure the Worker
Edit appsettings.json to point to your Milvaion infrastructure:
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Debug",
"System": "Debug"
}
},
"Worker": {
"WorkerId": "sample-worker-01",
"MaxParallelJobs": 128,
"ExecutionTimeoutSeconds": 300,
"RabbitMQ": {
"Host": "rabbitmq",
"Port": 5672,
"Username": "guest",
"Password": "guest",
"VirtualHost": "/"
},
"Redis": {
"ConnectionString": "redis:6379",
"Password": "",
"Database": 0,
"CancellationChannel": "Milvaion:JobScheduler:cancellation_channel"
},
"Heartbeat": {
"Enabled": true,
"IntervalSeconds": 5
},
"OfflineResilience": {
"Enabled": true,
"LocalStoragePath": "./worker_data",
"SyncIntervalSeconds": 30,
"MaxSyncRetries": 3,
"CleanupIntervalHours": 1,
"RecordRetentionDays": 1
}
},
"JobConsumers": {
"SimpleJob": {
"ConsumerId": "simple-consumer",
"MaxParallelJobs": 32,
"ExecutionTimeoutSeconds": 120,
"MaxRetries": 3,
"BaseRetryDelaySeconds": 5,
"LogUserFriendlyLogsViaLogger": true
},
"SendEmailJob": {
"ConsumerId": "email-consumer",
"MaxParallelJobs": 16,
"ExecutionTimeoutSeconds": 600,
"MaxRetries": 3,
"BaseRetryDelaySeconds": 5,
"LogUserFriendlyLogsViaLogger": true
}
}
}
Note: For Docker, use container names (
rabbitmq,redis) instead oflocalhost.
Step 4: Create Your First Job
Create Jobs/GenerateInvoiceJob.cs:
using System.Text.Json;
using Milvasoft.Milvaion.Sdk.Worker.Abstractions;
namespace MyCompany.BillingWorker.Jobs;
public class GenerateInvoiceJob : IAsyncJob<InvoiceJobData>
{
public async Task ExecuteAsync(IJobContext context)
{
// 1. Log start
context.LogInformation("Starting invoice generation job");
// 2. Get job data
var data = context.GetData<InvoiceJobData>();
if (data == null || data.OrderId <= 0)
{
context.LogError("Invalid OrderId");
throw new ArgumentException("OrderId is required");
}
context.LogInformation($"Generating invoice for OrderId: {data.OrderId}");
// 3. Cancellation check
context.CancellationToken.ThrowIfCancellationRequested();
// 4. Simulate invoice generation
await Task.Delay(3000, context.CancellationToken);
// 5. Finish
context.LogInformation($"Invoice successfully generated for OrderId: {data.OrderId}");
}
}
/// <summary>
/// Invoice job data definition.
/// This schema is automatically discovered and displayed in the dashboard.
/// </summary>
public class InvoiceJobData
{
/// <summary>
/// The order ID to generate invoice for.
/// </summary>
[Required]
[Description("The order identifier to generate an invoice for")]
public int OrderId { get; set; }
/// <summary>
/// Currency code for the invoice.
/// </summary>
[DefaultValue("USD")]
[Description("The currency code (e.g., 'USD', 'EUR', 'TRY')")]
public string Currency { get; set; } = "USD";
}
Step 5: Register the Job
Add configuration for your new job in appsettings.json:
{
"JobConsumers": {
"GenerateInvoiceJob": {
"ConsumerId": "invoice-consumer",
"MaxParallelJobs": 8,
"ExecutionTimeoutSeconds": 300,
"MaxRetries": 5,
"BaseRetryDelaySeconds": 10,
"LogUserFriendlyLogsViaLogger": true
}
}
}
The SDK automatically discovers jobs that implement any of the job interfaces:
- Non-generic:
IJob,IAsyncJob,IJobWithResult,IAsyncJobWithResult - Generic (typed data):
IJob<TJobData>,IAsyncJob<TJobData>,IJobWithResult<TJobData>,IAsyncJobWithResult<TJobData>
⚠️ Note: Synchronous jobs (
IJob,IJobWithResult) do not support cancellation. For cancellation support, use the async variants.
Step 6: Run the Worker
Locally
dotnet run
Expected output:
info: Milvaion.Sdk.Worker[0]
Registered job: GenerateInvoiceJob → MyCompany.BillingWorker.Jobs.GenerateInvoiceJob
info: Milvaion.Worker.Job[0]
Starting invoice generation job
info: Milvaion.Worker.Job[0]
Generating invoice for OrderId: 12345
info: Milvaion.Worker.Job[0]
Invoice successfully generated for OrderId: 12345
With Docker
Build and run:
Find existing Milvaion network name;
docker inspect milvaion-api --format='{{json .NetworkSettings.Networks}}'
docker build -t my-billing-worker .
docker run -d --name billing-worker \
--network milvaion-quick_milvaion-network \
my-billing-worker
Step 7: Test Your Job
Create the Job via API
curl -X POST http://localhost:5000/api/v1/jobs/job \
-H "Content-Type: application/json" \
-d '{
"displayName": "Generate Invoice",
"workerId": "billing-worker",
"selectedJobName": "GenerateInvoiceJob",
"cronExpression": "0 */5 * * * *",
"isActive": true,
"jobData": "{\"orderId\": 12345, \"currency\": \"EUR\"}"
}'
Trigger Immediately
curl -X POST http://localhost:5000/api/v1/jobs/job/trigger \
-H "Content-Type: application/json" \
-d '{"jobId": "YOUR_JOB_ID", "reason": "Testing", "force": true}'
Watch Execution
# Worker logs
docker logs -f billing-worker
# Or if running locally
dotnet run
Check in Dashboard
- Open http://localhost:5000
- Go to Jobs → Click your job
- See Execution History with logs
Understanding the Code
Program.cs (Entry Point)
using Microsoft.Extensions.Hosting;
using Milvasoft.Milvaion.Sdk.Worker;
var builder = Host.CreateApplicationBuilder(args);
// Register Worker SDK - auto-discovers IJob implementations
builder.Services.AddMilvaionWorkerWithJobs(builder.Configuration);
var host = builder.Build();
await host.RunAsync();
IJobContext
Every job receives a context object:
public interface IJobContext
{
// Unique ID for this execution (for tracing)
Guid CorrelationId { get; }
// The job definition (name, data, etc.)
ScheduledJob Job { get; }
// Which worker is running this
string WorkerId { get; }
// Cancel when shutdown requested
CancellationToken CancellationToken { get; }
/// <summary>
/// Deserializes and returns the job data as the specified type.
/// Uses the Job.JobData JSON string.
/// </summary>
/// <typeparam name="T">The type to deserialize to. Must be a class.</typeparam>
/// <returns>Deserialized job data or default if null/empty</returns>
T GetData<T>() where T : class;
// Logging methods (logs go to dashboard)
void LogInformation(string message);
void LogWarning(string message);
void LogError(string message, Exception ex = null);
}
Job Interfaces Hierarchy
All job interfaces inherit from IJobBase:
// Base interface for all job types
public interface IJobBase { }
// Non-generic interfaces
public interface IJob : IJobBase { ... }
public interface IJobWithResult : IJobBase { ... }
public interface IAsyncJob : IJobBase { ... }
public interface IAsyncJobWithResult : IJobBase { ... }
// Generic interfaces (for typed job data)
public interface IJob<TJobData> : IJob where TJobData : class, new() { }
public interface IJobWithResult<TJobData> : IJobWithResult where TJobData : class, new() { }
public interface IAsyncJob<TJobData> : IAsyncJob where TJobData : class, new() { }
public interface IAsyncJobWithResult<TJobData> : IAsyncJobWithResult where TJobData : class, new() { }
Job Data
Jobs receive data as JSON in context.Job.JobData. Use attributes to define the schema that will be displayed in the dashboard:
using System.ComponentModel;
using System.ComponentModel.DataAnnotations;
/// <summary>
/// Job data definition with schema metadata.
/// The schema is automatically discovered and shown in the UI.
/// </summary>
public class MyJobData
{
/// <summary>
/// The customer identifier.
/// </summary>
[Required]
[Description("The unique customer identifier")]
public string CustomerId { get; set; }
/// <summary>
/// The order identifier.
/// </summary>
[Required]
[Description("The order ID to process")]
public int OrderId { get; set; }
}
// Deserialize in your job
var data = JsonSerializer.Deserialize<MyJobData>(context.Job.JobData ?? "{}");
//or
var jobData = context.GetData<MyJobData>();
💡 Schema Discovery: Milvaion automatically discovers job data classes and sends their schema to the scheduler. The
[Required],[Description], and[DefaultValue]attributes provide metadata that is displayed in the dashboard, helping users understand what data each job expects.
Adding Dependency Injection
Jobs support constructor injection:
public class GenerateInvoiceJob : IAsyncJob<InvoiceJobData>
{
private readonly IBillingService _billingService;
private readonly ILogger<GenerateInvoiceJob> _logger;
public GenerateInvoiceJob(IBillingService billingService, ILogger<GenerateInvoiceJob> logger)
{
_logger = billingService;
_logger = logger;
}
public async Task ExecuteAsync(IJobContext context)
{
var data = context.GetData<InvoiceJobData>();
await billingService.SendAsync(data.To, data.Subject, data.Body);
context.LogInformation("Invoice successfully generated!");
}
}
Register services in Program.cs:
var builder = Host.CreateApplicationBuilder(args);
// Register your services
builder.Services.AddScoped<IBillingService, InvoiceService>();
// Register Worker SDK
builder.Services.AddMilvaionWorkerWithJobs(builder.Configuration);
var host = builder.Build();
await host.RunAsync();