Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/persist webhook events #24

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.stripe.model.Subscription;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -99,10 +100,17 @@ public Response webhook(String json, @Context HttpServletRequest request) throws
// takes a while, so spin it off as a new thread
Runnable thread = () -> {
try {
log.info("Saving event with id '{}'...", event.getId());
env.webhookRequestService().persist(event.getId(), "stripe", new JSONObject(event));
processEvent(event.getType(), stripeObject, env);

// TODO: decide if we want to delete right away or keep it for future analysis/analytics
//env.webhookRequestService().delete(event.getId());
} catch (Exception e) {
log.error("failed to process the Stripe event", e);
// TODO: email notification?
log.info("Updating event with id '{}' with error details...", event.getId());
env.webhookRequestService().updateWithErrorMessage(event.getId(), e.getMessage());
}
};
new Thread(thread).start();
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/impactupgrade/nucleus/dao/Dao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.impactupgrade.nucleus.dao;

import java.util.Optional;

public interface Dao<I, E> {
VSydor marked this conversation as resolved.
Show resolved Hide resolved

E create(E e);

Optional<E> get(I id);

E update(E e);

Optional<E> delete(I id);

}
65 changes: 65 additions & 0 deletions src/main/java/com/impactupgrade/nucleus/dao/WebhookRequestDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.impactupgrade.nucleus.dao;

import com.impactupgrade.nucleus.model.WebhookRequest;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cfg.Configuration;
import org.springframework.util.StringUtils;

import java.util.Optional;

public class WebhookRequestDao implements Dao<String, WebhookRequest> {

private final SessionFactory sessionFactory;

public WebhookRequestDao() {
final Configuration configuration = new Configuration();
configuration.addAnnotatedClass(WebhookRequest.class);
this.sessionFactory = configuration.buildSessionFactory(new StandardServiceRegistryBuilder().build());
}

@Override
public WebhookRequest create(WebhookRequest webhookRequest) {
final Session session = sessionFactory.openSession();
Transaction transaction = session.beginTransaction();
session.save(webhookRequest);
transaction.commit();
session.close();
return webhookRequest;
}

@Override
public Optional<WebhookRequest> get(String id) {
if (StringUtils.isEmpty(id)) {
return Optional.empty();
}

final Session session = sessionFactory.openSession();
WebhookRequest webhookRequest = session.get(WebhookRequest.class, id);
session.close();
return Optional.ofNullable(webhookRequest);
}

@Override
public WebhookRequest update(WebhookRequest webhookRequest) {
final Session session = sessionFactory.openSession();
Transaction transaction = session.beginTransaction();
session.update(webhookRequest);
transaction.commit();
session.close();
return webhookRequest;
}

@Override
public Optional<WebhookRequest> delete(String id) {
final Session session = sessionFactory.openSession();
Transaction transaction = session.beginTransaction();
WebhookRequest webhookRequest = session.get(WebhookRequest.class, id);
session.delete(webhookRequest);
transaction.commit();
session.close();
return Optional.ofNullable(webhookRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.impactupgrade.nucleus.client.TwilioClient;
import com.impactupgrade.nucleus.service.logic.ContactService;
import com.impactupgrade.nucleus.service.logic.DonationService;
import com.impactupgrade.nucleus.service.logic.WebhookRequestService;
import com.impactupgrade.nucleus.service.logic.MessagingService;
import com.impactupgrade.nucleus.service.segment.CrmService;
import com.impactupgrade.nucleus.service.segment.EmailPlatformService;
Expand Down Expand Up @@ -101,6 +102,7 @@ public void setOtherContext(MultivaluedMap<String, String> otherContext) {
public DonationService donationService() { return new DonationService(this); }
public ContactService contactService() { return new ContactService(this); }
public MessagingService messagingService() { return new MessagingService(this); }
public WebhookRequestService webhookRequestService() { return new WebhookRequestService(this); }

// segment services

Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/impactupgrade/nucleus/model/WebhookRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.impactupgrade.nucleus.model;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.util.Date;

@Entity
@Table(name = "webhook_request", schema = "public")
public class WebhookRequest {
@Id
public String id;
@Column(name = "payload_type", nullable = false)
public String payloadType;
@Column(name = "payload", nullable = false)
public String payload;
@Column(name = "error_message")
public String errorMessage;
@Column(name = "attempt_count")
public int attemptCount;
@Column(name = "first_attempt_time", nullable = false)
public Date firstAttemptTime;
@Column(name = "last_attempt_time")
public Date lastAttemptTime;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.impactupgrade.nucleus.service.logic;

import com.google.common.base.Strings;
import com.impactupgrade.nucleus.dao.WebhookRequestDao;
import com.impactupgrade.nucleus.environment.Environment;
import com.impactupgrade.nucleus.model.WebhookRequest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import java.util.Date;
import java.util.Objects;
import java.util.Optional;

public class WebhookRequestService {

private static final Logger log = LogManager.getLogger(WebhookRequestService.class);

private Environment env;
private WebhookRequestDao webhookRequestDao;

public WebhookRequestService(Environment env) {
this.env = env;
this.webhookRequestDao = new WebhookRequestDao();
}

public void persist(String id, String payloadType, JSONObject jsonObject) {
if (Strings.isNullOrEmpty(id) || Strings.isNullOrEmpty(payloadType) || Objects.isNull(jsonObject)) {
return;
}
try {
upsert(newWebhookRequest(id, payloadType, jsonObject));
} catch (Exception e) {
log.error("Failed to persist webhook request!", e);
}
}

public void updateWithErrorMessage(String id, String errorMessage) {
if (Strings.isNullOrEmpty(id)) {
return;
}
try {
Optional<WebhookRequest> existingRecord = webhookRequestDao.get(id);
if (existingRecord.isPresent()) {
WebhookRequest webhookRequest = existingRecord.get();
webhookRequest.errorMessage = errorMessage;
webhookRequestDao.update(webhookRequest);
}
} catch (Exception e) {
log.error("Failed to update webhook request!", e);
}
}

public void delete(String id) {
if (Strings.isNullOrEmpty(id)) {
return;
}
try {
webhookRequestDao.delete(id);
} catch (Exception e) {
log.error("Failed to delete failed request!", e);
}
}

// Utils
private WebhookRequest newWebhookRequest(String id, String payloadType, JSONObject payload) {
WebhookRequest webhookRequest = new WebhookRequest();
webhookRequest.id = id;
webhookRequest.payloadType = payloadType;
webhookRequest.payload = payload.toString();
webhookRequest.attemptCount = 1;
webhookRequest.firstAttemptTime = new Date();
webhookRequest.lastAttemptTime = new Date();
return webhookRequest;
}

private void upsert(WebhookRequest webhookRequest) {
Optional<WebhookRequest> existingRecord = webhookRequestDao.get(webhookRequest.id);
if (existingRecord.isEmpty()) {
webhookRequestDao.create(webhookRequest);
} else {
int attempts = existingRecord.get().attemptCount + 1;
webhookRequest.attemptCount = attempts;
webhookRequest.lastAttemptTime =new Date();
webhookRequestDao.update(webhookRequest);
}
}

}