Skip to main content
Version: 1.0.1

SQL Worker

The SQL Worker is a powerful, multi-database worker that can execute SQL queries and stored procedures against PostgreSQL, SQL Server, and MySQL databases. It uses Dapper for high-performance database operations with full parameterized query support.

Features

  • Multi-database support (PostgreSQL, SQL Server, MySQL)
  • Parameterized queries (SQL injection protection)
  • Stored procedure execution
  • Transaction support with configurable isolation levels
  • Three query types (NonQuery, Scalar, Reader)
  • Connection pooling via named connections
  • Result set limiting
  • Automatic JSON result formatting

Use Cases

ScenarioExample
Scheduled ReportsGenerate daily/weekly reports from database
Data CleanupPeriodic cleanup of old records
Data AggregationCalculate statistics and store results
Batch ProcessingProcess queued records in batches
Database MaintenanceRun maintenance scripts on schedule
ETL JobsExtract, transform, load operations

Security Model

For security, database connection strings are never included in job data. Instead:

  1. Worker Configuration: Connection strings are configured in the worker's appsettings.json
  2. Job Data: Jobs reference connections by alias name only
  3. UI Integration: Available connection names appear as job data definition
┌─────────────────────────────────────────────────────────┐
│ Worker appsettings.json │
│ ┌───────────────────────────────────────────────────┐ │
│ │ "ExecutorConfig": { │ │
│ │ "Connections": { │ │
│ │ "MainDatabase": { │ │
│ │ "ConnectionString": "Server=...;Pass=...", │ │ ← Secrets stay here
│ │ "Provider": "PostgreSql" │ │
│ │ } │ │
│ │ } │ │
│ │ } │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

│ Only alias is used

┌─────────────────────────────────────────────────────────┐
│ Job Data (from API/UI) │
│ ┌───────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "connectionName": "MainDatabase", ← Alias │ │
│ │ "query": "SELECT * FROM Users WHERE Id = @Id", │ │
│ │ "parameters": { "Id": 123 } │ │
│ │ } │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

Worker Configuration

Configure database connections in the worker's appsettings.json:

{
"ExecutorConfig": {
"Connections": {
"MainDatabase": {
"ConnectionString": "Host=localhost;Port=5432;Database=mydb;Username=user;Password=secret;",
"Provider": "PostgreSql",
"DefaultTimeoutSeconds": 30
},
"ReportingDatabase": {
"ConnectionString": "Host=reporting-db;Port=5432;Database=reports;Username=report_user;Password=report_pass;",
"Provider": "PostgreSql",
"DefaultTimeoutSeconds": 60
},
"LegacySqlServer": {
"ConnectionString": "Server=sql-server;Database=LegacyDb;User Id=sa;Password=pass;TrustServerCertificate=true;",
"Provider": "SqlServer",
"DefaultTimeoutSeconds": 30
},
"MySqlDatabase": {
"ConnectionString": "Server=mysql-host;Database=mydb;User=root;Password=pass;",
"Provider": "MySql",
"DefaultTimeoutSeconds": 30
}
}
}
}

Connection Properties

PropertyTypeRequiredDefaultDescription
ConnectionStringstring-Database connection string
Providerenum-Database provider: SqlServer, PostgreSql, MySql
DefaultTimeoutSecondsnumber-30Default command timeout for this connection

Job Data Schema

When creating a job with the SQL Worker, provide the query configuration through Job Data JSON:

{
"connectionName": "MainDatabase",
"query": "SELECT * FROM Users WHERE Status = @Status AND CreatedAt > @Since",
"parameters": {
"Status": "Active",
"Since": "2024-01-01"
},
"commandType": "Text",
"queryType": "Reader",
"timeoutSeconds": 60,
"maxRows": 1000
}

Configuration Reference

Main Properties

PropertyTypeRequiredDefaultDescription
connectionNamestring-Connection alias from worker configuration
querystring-SQL query or stored procedure name
parametersobject--Query parameters as key-value pairs
commandTypeenum-TextCommand type: Text or StoredProcedure
queryTypeenum-NonQueryQuery type: NonQuery, Scalar, or Reader
timeoutSecondsnumber-0Command timeout (0 = use connection default)
useTransactionboolean-falseWrap execution in a transaction
isolationLevelenum-ReadCommittedTransaction isolation level
maxRowsnumber-0Maximum rows to return (0 = unlimited)

Query Types

TypeDescriptionResult Format
NonQueryINSERT, UPDATE, DELETE statements{ "affectedRows": 5 }
ScalarReturns first column of first row{ "value": 42 }
ReaderReturns full result set as JSON array{ "data": [...], "rowCount": 10 }

Command Types

TypeDescriptionExample
TextRaw SQL query (default)SELECT * FROM Users
StoredProcedureExecute stored proceduresp_GetUserById

Transaction Isolation Levels

LevelDescription
ReadUncommittedAllows dirty reads
ReadCommittedDefault; prevents dirty reads
RepeatableReadPrevents non-repeatable reads
SerializableHighest isolation; prevents phantom reads
SnapshotRow versioning (SQL Server specific)

Practical Examples

Example 1: Simple SELECT Query

{
"connectionName": "MainDatabase",
"query": "SELECT Id, Name, Email FROM Users WHERE IsActive = true",
"queryType": "Reader",
"maxRows": 100
}

Result:

{
"queryType": "Reader",
"rowCount": 3,
"data": [
{ "id": 1, "name": "John Doe", "email": "[email protected]" },
{ "id": 2, "name": "Jane Smith", "email": "[email protected]" },
{ "id": 3, "name": "Bob Wilson", "email": "[email protected]" }
],
"success": true
}

Example 2: Parameterized INSERT

{
"connectionName": "MainDatabase",
"query": "INSERT INTO AuditLogs (Action, UserId, Timestamp, Details) VALUES (@Action, @UserId, @Timestamp, @Details)",
"parameters": {
"Action": "UserLogin",
"UserId": 123,
"Timestamp": "2024-01-15T10:30:00Z",
"Details": "Login from IP 192.168.1.100"
},
"queryType": "NonQuery"
}

Result:

{
"queryType": "NonQuery",
"affectedRows": 1,
"success": true
}

Example 3: Scalar Query (Count)

{
"connectionName": "ReportingDatabase",
"query": "SELECT COUNT(*) FROM Orders WHERE OrderDate >= @StartDate AND OrderDate < @EndDate",
"parameters": {
"StartDate": "2024-01-01",
"EndDate": "2024-02-01"
},
"queryType": "Scalar"
}

Result:

{
"queryType": "Scalar",
"value": 1547,
"success": true
}

Example 4: Stored Procedure

{
"connectionName": "LegacySqlServer",
"query": "sp_GenerateMonthlyReport",
"commandType": "StoredProcedure",
"parameters": {
"Year": 2024,
"Month": 1,
"DepartmentId": 5
},
"queryType": "Reader",
"timeoutSeconds": 120
}

Example 5: Transaction with UPDATE

{
"connectionName": "MainDatabase",
"query": "UPDATE Accounts SET Balance = Balance - @Amount WHERE AccountId = @FromAccount; UPDATE Accounts SET Balance = Balance + @Amount WHERE AccountId = @ToAccount;",
"parameters": {
"Amount": 100.00,
"FromAccount": "ACC-001",
"ToAccount": "ACC-002"
},
"queryType": "NonQuery",
"useTransaction": true,
"isolationLevel": "Serializable"
}

Example 6: Batch Delete with Limit

{
"connectionName": "MainDatabase",
"query": "DELETE FROM TempData WHERE CreatedAt < @CutoffDate LIMIT 10000",
"parameters": {
"CutoffDate": "2023-01-01"
},
"queryType": "NonQuery",
"timeoutSeconds": 300
}

Example 7: PostgreSQL-Specific (RETURNING)

{
"connectionName": "MainDatabase",
"query": "INSERT INTO Users (Name, Email) VALUES (@Name, @Email) RETURNING Id, CreatedAt",
"parameters": {
"Name": "New User",
"Email": "[email protected]"
},
"queryType": "Reader"
}

Result:

{
"queryType": "Reader",
"rowCount": 1,
"data": [
{ "id": 42, "createdAt": "2024-01-15T10:30:00Z" }
],
"success": true
}

Error Handling

The SQL Worker distinguishes between permanent and transient errors:

Permanent Errors (No Retry)

These errors go directly to the Dead Letter Queue (DLQ):

ErrorDescription
Syntax ErrorInvalid SQL syntax
Permission DeniedInsufficient database permissions
Object Not FoundTable, column, or procedure doesn't exist
Constraint ViolationUnique, foreign key, or check constraint
Invalid Connection NameConnection alias not found in configuration
Missing Required FieldsconnectionName or query not provided

Transient Errors (Retryable)

These errors are retried according to the job consumer's retry policy:

ErrorDescription
Connection TimeoutDatabase connection timeout
Command TimeoutQuery execution timeout
Connection LostNetwork interruption during execution
DeadlockTransaction deadlock (database will retry)
Resource BusyDatabase server overloaded

Job Result

After successful execution, the job stores a result based on query type:

NonQuery Result

{
"queryType": "NonQuery",
"affectedRows": 5,
"success": true
}

Scalar Result

{
"queryType": "Scalar",
"value": 42,
"success": true
}

Reader Result

{
"queryType": "Reader",
"rowCount": 10,
"data": [
{ "column1": "value1", "column2": 123 },
...
],
"success": true
}

Deployment

The SQL Worker can be deployed as a Docker container:

# docker-compose.yml
services:
sql-worker:
image: milvasoft/milvaion-sqlworker:latest
environment:
- Worker__WorkerId=sql-worker-01
- Worker__RabbitMQ__Host=rabbitmq
- Worker__RabbitMQ__Port=5672
- Worker__RabbitMQ__Username=guest
- Worker__RabbitMQ__Password=guest
- Worker__MaxParallelJobs=16
# Connection strings should use secrets in production
- ExecutorConfig__Connections__MilvaionDatabase__ConnectionString=Host=postgres;Database=mydb;Username=user;Password=secret
- ExecutorConfig__Connections__MilvaionDatabase__Provider=PostgreSql

Security Note: In production, use Docker secrets or a secrets manager (Azure Key Vault, AWS Secrets Manager, HashiCorp Vault) for connection strings.

Best Practices

  1. Use Parameterized Queries

    • Never concatenate user input into SQL strings
    • Always use the parameters object for dynamic values
  2. Set Appropriate Timeouts

    • Match timeout to expected query duration
    • Long-running reports should have higher timeouts
  3. Limit Result Sets

    • Use maxRows to prevent memory issues with large datasets
    • Consider pagination for large data exports
  4. Use Transactions Wisely

    • Enable transactions for multi-statement operations
    • Choose appropriate isolation level for your use case
  5. Monitor Execution

    • Check execution logs for slow queries
    • Set up alerts for repeated failures
  6. Connection Management

    • Define separate connections for different workloads (OLTP vs. reporting)
    • Use read replicas for heavy read operations

Supported Databases

DatabaseProviderConnection String Format
PostgreSQLPostgreSqlHost=host;Port=5432;Database=db;Username=user;Password=pass;
SQL ServerSqlServerServer=server;Database=db;User Id=user;Password=pass;TrustServerCertificate=true;
MySQL/MariaDBMySqlServer=host;Database=db;User=user;Password=pass;

For custom workers, see Your First Worker and Implementing Jobs.