Skip to content

5.13 Implementation patterns reference

This page collects the cross-cutting implementation patterns that every service in the platform relies on. Each is described in enough detail that an engineer can implement it from scratch.

Java / Spring Boot syntax is used throughout; the patterns translate to other stacks.

Every mutating external API call must be retry-safe. Network failures cause clients to retry; if the server has already processed the original call but not yet returned, the retry creates a duplicate side effect.

In lending, duplicate disbursements, duplicate repayment recordings, duplicate event emissions are catastrophic.

The client sends an X-Idempotency-Key header with every mutating request. The server:

  1. Computes key_hash = SHA-256(idempotency_key || endpoint || principal).
  2. Looks up idempotency_response table by key_hash.
  3. If hit: returns the cached response (HTTP status + body).
  4. If miss: processes the request; before responding, stores the response by key_hash.
CREATE TABLE idempotency_response (
key_hash CHAR(64) PRIMARY KEY,
endpoint VARCHAR(255) NOT NULL,
actor_id BIGINT NOT NULL,
response_status INT NOT NULL,
response_body JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idempotency_expires_idx ON idempotency_response(expires_at);

Retention: 7 days typical. Cleanup via cron.

@Component
@Order(Ordered.HIGHEST_PRECEDENCE + 10)
public class IdempotencyFilter implements Filter {
private final IdempotencyRepository repo;
private final ObjectMapper objectMapper;
@Override
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) {
var request = (HttpServletRequest) req;
var response = (HttpServletResponse) res;
if (!isMutating(request)) { chain.doFilter(req, res); return; }
String idempotencyKey = request.getHeader("X-Idempotency-Key");
if (idempotencyKey == null) {
// Allowed for backward-compat in some paths; or strict mode reject:
response.sendError(400, "X-Idempotency-Key required");
return;
}
String actor = SecurityContextHolder.getContext().getAuthentication().getName();
String keyHash = sha256(idempotencyKey + ":" + request.getRequestURI() + ":" + actor);
var existing = repo.findByKeyHash(keyHash);
if (existing.isPresent()) {
// Return cached response
var cached = existing.get();
response.setStatus(cached.getResponseStatus());
response.getWriter().write(cached.getResponseBody());
return;
}
// Wrap response so we can capture body
var wrapper = new ContentCachingResponseWrapper(response);
chain.doFilter(req, wrapper);
wrapper.copyBodyToResponse();
// Cache for future replays
var bodyStr = new String(wrapper.getContentAsByteArray());
repo.save(new IdempotencyResponse(keyHash, request.getRequestURI(), actor,
wrapper.getStatus(), bodyStr, Instant.now(), Instant.now().plus(Duration.ofDays(7))));
}
private boolean isMutating(HttpServletRequest req) {
return Set.of("POST","PUT","PATCH","DELETE").contains(req.getMethod());
}
private String sha256(String s) {
var md = MessageDigest.getInstance("SHA-256");
return HexFormat.of().formatHex(md.digest(s.getBytes(StandardCharsets.UTF_8)));
}
}

Same pattern, different storage:

CREATE TABLE consumer_seen_keys (
consumer_name VARCHAR(255) NOT NULL,
topic VARCHAR(255) NOT NULL,
idempotency_key VARCHAR(255) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (consumer_name, topic, idempotency_key)
);

Consumer code:

@KafkaListener(topics = "loan.activated")
public void handle(LoanActivatedEvent event, @Header("idempotency_key") String key) {
try {
seenKeysRepo.save(new SeenKey("disbursement-confirmer", "loan.activated", key, Instant.now()));
} catch (DuplicateKeyException e) {
// Already processed
return;
}
// Process the event
processLoanActivation(event);
}

PAN / Aadhaar / mobile / email / bank account are highly sensitive. Storing them clear-text in the main DB exposes the lender to large-scale data loss if the DB is breached. The pattern: store only tokens in main DB; clear text in a separate vault with strict access controls.

A separate service with its own DB:

CREATE SCHEMA vault;
CREATE TABLE vault.secret (
token CHAR(36) PRIMARY KEY,
plaintext_encrypted BYTEA NOT NULL, -- encrypted with KMS data key
secret_type VARCHAR(64) NOT NULL, -- 'pan', 'aadhaar', 'mobile', etc.
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_by VARCHAR(255) NOT NULL
);
CREATE TABLE vault.access_log (
id BIGSERIAL PRIMARY KEY,
token CHAR(36) NOT NULL,
operation VARCHAR(32) NOT NULL, -- 'tokenize', 'detokenize'
requested_by VARCHAR(255) NOT NULL,
request_ip INET,
purpose VARCHAR(255) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT now()
);
  • POST /v1/vault/tokenize{ "type": "pan", "value": "ABCDE1234F" }{ "token": "tok_pan_..." }.
  • POST /v1/vault/detokenize{ "token": "tok_pan_...", "purpose": "kfs_generation" }{ "type": "pan", "value": "ABCDE1234F" }. Logged.
  • POST /v1/vault/mask{ "token": "tok_pan_..." }{ "masked": "ABCDE****F" }. For display.
@Service
public class VaultService {
private final SecretRepository repo;
private final AwsCrypto crypto; // AWS Encryption SDK
private final KmsMasterKeyProvider kmsProvider;
public String tokenize(String secretType, String plaintext, String actor) {
var encrypted = crypto.encryptData(kmsProvider, plaintext.getBytes(UTF_8)).getResult();
var token = "tok_" + secretType + "_" + UUID.randomUUID();
repo.save(new Secret(token, encrypted, secretType, Instant.now(), actor));
return token;
}
public String detokenize(String token, String purpose, String actor, String requestIp) {
var record = repo.findById(token).orElseThrow();
accessLogRepo.save(new AccessLog(token, "detokenize", actor, requestIp, purpose, Instant.now()));
var decrypted = crypto.decryptData(kmsProvider, record.getPlaintextEncrypted()).getResult();
return new String(decrypted, UTF_8);
}
}

Every detokenize call:

  • Authenticated caller (service or user).
  • Authorisation check (RBAC: does this caller have permission to detokenize this type for this purpose).
  • Logged.
  • Periodic review of detokenize logs for anomaly.

For display in dashboards / logs:

public String maskPan(String token) {
String clear = vault.detokenize(token, "mask", "system", null);
return clear.substring(0, 4) + "****" + clear.substring(clear.length() - 2);
}

Audit logs without integrity guarantees can be tampered with by a privileged insider. Hash-chaining makes tampering detectable.

CREATE TABLE audit_event (
id BIGSERIAL PRIMARY KEY,
actor_user_id BIGINT,
actor_service_id VARCHAR(255),
action VARCHAR(255) NOT NULL,
resource_type VARCHAR(64) NOT NULL,
resource_id VARCHAR(64) NOT NULL,
before_state_json JSONB,
after_state_json JSONB,
request_ip INET,
request_user_agent TEXT,
timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
prev_hash CHAR(64) NOT NULL,
this_hash CHAR(64) NOT NULL
);
CREATE INDEX audit_resource_idx ON audit_event(resource_type, resource_id);
CREATE INDEX audit_actor_idx ON audit_event(actor_user_id, timestamp);
CREATE OR REPLACE FUNCTION audit_event_hash()
RETURNS TRIGGER AS $$
DECLARE
last_hash CHAR(64);
BEGIN
SELECT this_hash INTO last_hash
FROM audit_event
ORDER BY id DESC LIMIT 1;
IF last_hash IS NULL THEN
last_hash := repeat('0', 64);
END IF;
NEW.prev_hash := last_hash;
NEW.this_hash := encode(
digest(
last_hash ||
COALESCE(NEW.actor_user_id::text,'') ||
COALESCE(NEW.actor_service_id,'') ||
NEW.action ||
NEW.resource_type ||
NEW.resource_id ||
NEW.timestamp::text ||
COALESCE(NEW.before_state_json::text,'') ||
COALESCE(NEW.after_state_json::text,''),
'sha256'
),
'hex'
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER audit_event_hash_chain BEFORE INSERT ON audit_event
FOR EACH ROW EXECUTE FUNCTION audit_event_hash();

Runs daily:

-- Verify the chain
WITH chain AS (
SELECT id, prev_hash, this_hash,
encode(digest(
LAG(this_hash) OVER (ORDER BY id) || ... -- same input as trigger
, 'sha256'), 'hex') AS expected_hash
FROM audit_event
)
SELECT id FROM chain WHERE this_hash != expected_hash;
-- Any rows returned = tampering

If any row mismatches, alert immediately.

@Aspect
@Component
public class AuditAspect {
private final AuditRepository repo;
@AfterReturning(value = "@annotation(Audited)", returning = "result")
public void audit(JoinPoint jp, Audited annotation, Object result) {
var event = new AuditEvent();
event.setAction(annotation.action());
event.setResourceType(annotation.resourceType());
event.setResourceId(extractResourceId(jp, result));
event.setActorUserId(SecurityContext.currentUserId());
// before / after captured via annotation processing
repo.save(event);
}
}

Many vendors (bureau, KYC, AA, BSA, GST, eSign, NACH, etc.) all need similar treatment: auth, retry, circuit-breaker, timeout, audit, observability. Building per-vendor duplicates the boilerplate.

A base adapter interface and class:

public interface VendorAdapter<REQ, RES> {
Vendor vendor();
String operation();
RES call(REQ request, Map<String, String> metadata);
}
@RequiredArgsConstructor
@Slf4j
public abstract class BaseVendorAdapter<REQ, RES> implements VendorAdapter<REQ, RES> {
private final RetryRegistry retryRegistry;
private final CircuitBreakerRegistry cbRegistry;
private final MeterRegistry meterRegistry;
private final VendorCallLogRepository callLog;
private final AuditService auditService;
@Override
public final RES call(REQ request, Map<String, String> metadata) {
var traceId = MDC.get("traceId");
var idempotencyKey = metadata.getOrDefault("idempotency_key", UUID.randomUUID().toString());
// Pre-call audit
long startNanos = System.nanoTime();
var callRecord = callLog.startCall(vendor(), operation(), idempotencyKey, traceId);
try {
var cb = cbRegistry.circuitBreaker(vendor().name());
var retry = retryRegistry.retry(vendor().name());
RES result = Retry.decorateSupplier(retry,
CircuitBreaker.decorateSupplier(cb, () -> doCall(request))
).get();
// Post-call audit
callLog.completeCall(callRecord.getId(), "success", null);
meterRegistry.timer("vendor.call.latency",
"vendor", vendor().name(), "operation", operation(), "outcome", "success")
.record(Duration.ofNanos(System.nanoTime() - startNanos));
return result;
} catch (Exception e) {
callLog.completeCall(callRecord.getId(), "failed", e.getMessage());
meterRegistry.timer("vendor.call.latency",
"vendor", vendor().name(), "operation", operation(), "outcome", "failed")
.record(Duration.ofNanos(System.nanoTime() - startNanos));
throw mapException(e);
}
}
protected abstract RES doCall(REQ request);
protected abstract VendorException mapException(Exception e);
}
@Component
public class KarzaPanVerifyAdapter extends BaseVendorAdapter<PanVerifyRequest, PanVerifyResponse> {
private final KarzaClient karzaClient;
@Override
public Vendor vendor() { return Vendor.KARZA; }
@Override
public String operation() { return "pan.verify"; }
@Override
protected PanVerifyResponse doCall(PanVerifyRequest request) {
var raw = karzaClient.verifyPan(request.pan(), request.name(), request.dob());
return PanVerifyResponse.fromKarza(raw);
}
@Override
protected VendorException mapException(Exception e) {
if (e instanceof KarzaApiException kae) {
return switch (kae.getErrorCode()) {
case "INVALID_PAN" -> new VendorValidationException("Invalid PAN format");
case "RATE_LIMITED" -> new VendorRateLimitedException();
default -> new VendorException("Karza error: " + kae.getMessage(), e);
};
}
return new VendorException("Unexpected: " + e.getMessage(), e);
}
}
@Service
public class PanVerifyService {
private final List<VendorAdapter<PanVerifyRequest, PanVerifyResponse>> adapters; // ordered primary → fallback
private final VendorRoutingConfig routingConfig;
public PanVerifyResponse verify(PanVerifyRequest req) {
var enabledAdapters = routingConfig.getEnabledFor("pan.verify");
for (var adapter : enabledAdapters) {
try {
return adapter.call(req, Map.of());
} catch (VendorRateLimitedException | VendorCircuitOpenException e) {
continue; // try next
}
}
throw new VendorException("All vendors failed for pan.verify");
}
}

Publishing events to a message bus in the same transaction as a DB write is unsafe — if either fails, the system is inconsistent. The outbox pattern decouples write from publish.

CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'pending', -- pending, published, failed
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ,
attempts INT NOT NULL DEFAULT 0
);
CREATE INDEX outbox_status_idx ON outbox(status, id);
@Transactional
public void activateLoan(Long loanId, BigDecimal initialPrincipal) {
// Business write
loanRepo.activate(loanId, initialPrincipal);
// Outbox write — same transaction
var event = new LoanActivatedEvent(loanId, initialPrincipal, Instant.now());
outboxRepo.save(new Outbox("loan", String.valueOf(loanId), "loan.activated",
objectMapper.writeValueAsString(event), "pending"));
}
@Component
public class OutboxPublisher {
private final OutboxRepository repo;
private final KafkaTemplate<String, String> kafka;
@Scheduled(fixedDelay = 1000)
public void publish() {
var pending = repo.findPendingForUpdate(PageRequest.of(0, 100));
for (var msg : pending) {
try {
var topic = msg.getEventType();
var key = msg.getAggregateId();
kafka.send(topic, key, msg.getPayload()).get(5, TimeUnit.SECONDS);
repo.markPublished(msg.getId());
} catch (Exception e) {
repo.markFailed(msg.getId(), e.getMessage());
}
}
}
}

Multi-step operations spanning services (sanction → docs → eSign → mandate → disbursement) need orchestration with rollback on failure.

BPMN process:

<bpmn:process id="sanctionToDisbursement">
<bpmn:startEvent id="start" />
<bpmn:serviceTask id="generateDocs" camunda:expression="${docsService.generateAll(sanctionId)}" />
<bpmn:serviceTask id="kfsAck" camunda:expression="${kfsService.waitForAck(sanctionId)}" />
<bpmn:serviceTask id="esignFlow" camunda:expression="${esignService.startFlow(sanctionId)}" />
<bpmn:serviceTask id="estamp" camunda:expression="${estampService.issue(sanctionId)}" />
<bpmn:serviceTask id="mandate" camunda:expression="${mandateService.activate(sanctionId)}" />
<bpmn:serviceTask id="disburse" camunda:expression="${disbursementService.execute(sanctionId)}" />
<bpmn:endEvent id="end" />
<!-- Compensation handlers -->
<bpmn:boundaryEvent attachedToRef="generateDocs" cancelActivity="false">
<bpmn:compensateEventDefinition />
</bpmn:boundaryEvent>
<!-- ... per step -->
</bpmn:process>
@WorkflowInterface
public interface SanctionToDisbursementWorkflow {
@WorkflowMethod void execute(Long sanctionId);
}
public class SanctionToDisbursementImpl implements SanctionToDisbursementWorkflow {
private final DocsActivities docs = Workflow.newActivityStub(DocsActivities.class);
private final EsignActivities esign = Workflow.newActivityStub(EsignActivities.class);
private final EstampActivities estamp = Workflow.newActivityStub(EstampActivities.class);
private final MandateActivities mandate = Workflow.newActivityStub(MandateActivities.class);
private final DisbursementActivities disburse = Workflow.newActivityStub(DisbursementActivities.class);
@Override
public void execute(Long sanctionId) {
var saga = new Saga(new Saga.Options.Builder().build());
try {
var docIds = docs.generateAll(sanctionId);
saga.addCompensation(() -> docs.voidAll(docIds));
esign.startFlowAndWait(sanctionId, docIds);
// No compensation; eSign is borrower-side; once done, done
var stamp = estamp.issue(sanctionId);
saga.addCompensation(() -> estamp.cancel(stamp));
mandate.activateAndWait(sanctionId);
saga.addCompensation(() -> mandate.cancel(sanctionId));
disburse.execute(sanctionId);
// No compensation; disbursement is irreversible once UTR
} catch (Exception e) {
saga.compensate();
throw Workflow.wrap(e);
}
}
}

Every request and event carries a trace ID for end-to-end observability:

@Component
public class TracingFilter implements Filter {
@Override
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) {
String traceId = ((HttpServletRequest) req).getHeader("X-Trace-Id");
if (traceId == null) traceId = UUID.randomUUID().toString();
MDC.put("traceId", traceId);
((HttpServletResponse) res).setHeader("X-Trace-Id", traceId);
try {
chain.doFilter(req, res);
} finally {
MDC.clear();
}
}
}

For Kafka events, traceId propagates via headers; consumer pulls back into MDC.

These patterns are non-negotiable foundations. Every service the platform ships uses idempotency, audit logging, vendor adapters, the outbox; many use sagas. A service that skips these will fail compliance review or production debug-ability.