Skip to content

Commit 59a1c3f

Browse files
committed
[Fix #1395] Handling duplicated id
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent e9a2821 commit 59a1c3f

3 files changed

Lines changed: 18 additions & 14 deletions

File tree

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.ArrayList;
2424
import java.util.Collection;
2525
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.LinkedHashSet;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Map.Entry;
@@ -90,7 +92,7 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
9092
CorrelationOperations operations, String reg, CloudEvent event) {
9193
logger.debug(
9294
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
93-
Map<String, List<CloudEvent>> events = initMap();
95+
Map<String, Collection<CloudEvent>> events = initMap();
9496
operations.retrieveEvents(events);
9597
events.get(reg).add(event);
9698
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
@@ -99,39 +101,43 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
99101
return result;
100102
}
101103

102-
private Map<String, List<CloudEvent>> initMap() {
104+
private Map<String, Collection<CloudEvent>> initMap() {
103105
return id2RegMapping.keySet().stream()
104-
.collect(Collectors.toMap(k -> k, k -> new ArrayList<>()));
106+
.collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>()));
105107
}
106108

107109
private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
108110
CorrelationOperations operations) {
109111
logger.debug("Checking cloud events for definition {}", definition.id());
110112
operations.clearProcessed();
111-
Map<String, List<CloudEvent>> events = initMap();
113+
Map<String, Collection<CloudEvent>> events = initMap();
112114
operations.retrieveEvents(events);
113115
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
114116
markProcessed(operations, result);
115117
return result;
116118
}
117119

118120
private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
119-
Map<String, List<CloudEvent>> events) {
121+
Map<String, Collection<CloudEvent>> events) {
120122
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
121123
if (events.isEmpty()) {
122124
return List.of();
123125
}
124126
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>();
127+
Map<String, Iterator<CloudEvent>> iteratingEvents =
128+
events.entrySet().stream()
129+
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator()));
125130
boolean notDone = true;
126131
while (notDone) {
127132
Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>();
128-
for (Entry<String, List<CloudEvent>> item : events.entrySet()) {
129-
List<CloudEvent> list = item.getValue();
130-
if (list.isEmpty()) {
133+
for (Entry<String, Iterator<CloudEvent>> item : iteratingEvents.entrySet()) {
134+
Iterator<CloudEvent> iter = item.getValue();
135+
if (!iter.hasNext()) {
131136
notDone = false;
132137
break;
133138
}
134-
row.put(id2RegMapping.get(item.getKey()), list.remove(0));
139+
row.put(id2RegMapping.get(item.getKey()), iter.next());
140+
iter.remove();
135141
}
136142
if (notDone) {
137143
result.add(row);

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import java.util.Collection;
20-
import java.util.List;
2120
import java.util.Map;
2221

2322
interface CorrelationOperations {
2423

25-
default void retrieveEvents(Map<String, List<CloudEvent>> reg2EventsMap) {}
24+
default void retrieveEvents(Map<String, Collection<CloudEvent>> reg2EventsMap) {}
2625

2726
default void storeEvent(String regId, CloudEvent event) {}
2827

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo;
2929
import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo;
3030
import java.util.Collection;
31-
import java.util.List;
3231
import java.util.Map;
3332
import java.util.Map.Entry;
3433
import java.util.Optional;
@@ -112,13 +111,13 @@ public void clearStatus(WorkflowContextData workflowContext) {
112111
}
113112

114113
@Override
115-
public void retrieveEvents(Map<String, List<CloudEvent>> events) {
114+
public void retrieveEvents(Map<String, Collection<CloudEvent>> events) {
116115
events
117116
.entrySet()
118117
.forEach(
119118
e -> {
120119
String regId = e.getKey();
121-
List<CloudEvent> cloudEvents = e.getValue();
120+
Collection<CloudEvent> cloudEvents = e.getValue();
122121
Map<String, P> processedCes = processedCloudEvents(regId);
123122
cloudEvents(regId).values().stream()
124123
.map(this::unmarshallCloudEvent)

0 commit comments

Comments
 (0)