-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Make GeoIp downloader multi-project aware #128282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…oip/GeoIpDownloader.java Co-authored-by: Copilot <[email protected]>
Pinging @elastic/es-data-management (Team:Data Management) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the GeoIP downloader to be aware of multiple projects by scoping persistent tasks, routing table lookups, and state updates per project.
- Introduces project-scoped task IDs and a ProjectResolver to manage downloaders per project.
- Extends
ClusterChangedEvent
, task executor, downloader, transport actions, and tests to pass and respectProjectId
. - Adds a multi-project integration test and updates existing unit tests for project context.
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java | Added testChangedCustomProjectMetadataSet to verify per-project custom metadata changes. |
server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java | Added changedCustomProjectMetadataSet(ProjectId) for per-project metadata diffs. |
muted-tests.yml | Unmuted old ingest-geoip client YAML tests. |
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java | Updated unit tests to initialize and pass ProjectId and ProjectResolver . |
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java | Replaced cluster-state APIs in tests with project-scoped builders. |
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceTests.java | Extended createClusterState helpers to accept ProjectId . |
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpStatsTransportAction.java | Injected ProjectResolver and fetch task per project. |
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java | Wired ProjectResolver into the downloader executor. |
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java | Refactored to maintain a map of downloaders per project, use per-project settings, and handle cluster changes. |
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java | Made downloader project-aware in index lookups, logging, and boundary checks. |
modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java | New REST integration test for multi-project GeoIP download behavior. |
modules/ingest-geoip/qa/multi-project/build.gradle | Added the multi-project QA module to build and test. |
Comments suppressed due to low confidence (1)
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java:239
- There are no unit tests covering the multi‐project
clusterChanged
logic for starting and stopping tasks per project. Add tests to verify behavior when settings change, pipelines are added/removed, and projects are deleted.
if (event.metadataChanged() == false) {
GeoIpDownloader currentDownloader = getCurrentTask(); | ||
if (currentDownloader != null) { | ||
currentDownloader.requestReschedule(); | ||
for (var projectMetadataEntry : event.state().metadata().projects().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clusterChanged
method only iterates over newly present projects, so removed projects never trigger task removal. Consider also iterating previousState.metadata().projects()
to stop tasks for projects that were deleted.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is valid. To delete a project, the related persistent tasks ought to be cancelled before the project metadata is deleted from the cluster state, otherwise all sorts of things can go wrong. Project deletion should probably be handled separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the related persistent tasks ought to be cancelled before the project metadata is deleted from the cluster state, otherwise all sorts of things can go wrong
Yep, I agree. Project deletion is currently still being discussed, so we can leave this for now. Can you put a @FixForMultiProject
with a comment to remind us that we'll want to have some safeguards in place for project deletion? I'm sure we'll want to have safeguards, but I'm not sure yet what we should do as a safeguard, hence the annotation reminder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the safeguard be at the place where project deletion is handled (probably non-existing yet) instead instead of here? Perhaps it's better to track this safeguarding in the work for project deletion in general. I added a comment here as a reminder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer that we put the annotation here as well. That way we're sure it won't be forgotten.
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, Sam! And thanks for splitting the changes up into a very comfortable size :)
- Can you also update the title and/or final squashed commit message to be a bit more specific?
- Did you try running the GeoIP YAML tests in MP mode too? Or do those require some more work?
modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java
Outdated
Show resolved
Hide resolved
modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java
Outdated
Show resolved
Hide resolved
...s/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java
Outdated
Show resolved
Hide resolved
GeoIpDownloader currentDownloader = getCurrentTask(); | ||
if (currentDownloader != null) { | ||
currentDownloader.requestReschedule(); | ||
for (var projectMetadataEntry : event.state().metadata().projects().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the related persistent tasks ought to be cancelled before the project metadata is deleted from the cluster state, otherwise all sorts of things can go wrong
Yep, I agree. Project deletion is currently still being discussed, so we can leave this for now. Can you put a @FixForMultiProject
with a comment to remind us that we'll want to have some safeguards in place for project deletion? I'm sure we'll want to have safeguards, but I'm not sure yet what we should do as a safeguard, hence the annotation reminder.
...s/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java
Outdated
Show resolved
Hide resolved
...s/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java
Outdated
Show resolved
Hide resolved
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java
Outdated
Show resolved
Hide resolved
Updated the title, and will for sure squash the commits before merging
The GeoIP yaml tests requires more work in MP, which is loading the databases from system index to ingest node. To limit the size of this PR, I will include that part in another PR |
if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) { | ||
// we should only start/stop task from single node, master is the best as it will go through it anyway | ||
return; | ||
} | ||
if (enabled) { | ||
startTask(() -> {}); | ||
startTask(projectResolver.getProjectId(), () -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only works because the project resolver currently still returns a fallback project ID (which will be removed in the future). This will need to be updated in the future anyway as well (when these settings are made project-aware), but I'd still prefer that we just hard-code ProjectId.DEFAULT
instead. That way we at least don't rely on temporary behavior. The same goes for the methods/settings below.
GeoIpDownloader currentDownloader = getCurrentTask(); | ||
if (currentDownloader != null) { | ||
currentDownloader.requestReschedule(); | ||
for (var projectMetadataEntry : event.state().metadata().projects().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer that we put the annotation here as well. That way we're sure it won't be forgotten.
for (var projectMetadataEntry : event.state().metadata().projects().entrySet()) { | ||
ProjectId projectId = projectMetadataEntry.getKey(); | ||
ProjectMetadata projectMetadata = projectMetadataEntry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (var projectMetadataEntry : event.state().metadata().projects().entrySet()) { | |
ProjectId projectId = projectMetadataEntry.getKey(); | |
ProjectMetadata projectMetadata = projectMetadataEntry.getValue(); | |
for (var projectMetadata : event.state().metadata().projects().values()) { | |
ProjectId projectId = projectMetadata.id(); |
boolean taskIsBootstrapped = taskIsBootstrappedByProject.computeIfAbsent(projectId, k -> false); | ||
if (taskIsBootstrapped != true) { | ||
taskIsBootstrappedByProject.put(projectId, true); | ||
this.taskIsBootstrappedByProject.computeIfAbsent(projectId, k -> hasAtLeastOneGeoipProcessor(projectMetadata)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed that taskIsBootstrappedByProject
is used async in startTask
and stopTask
. The current code isn't 100% airtight. I was checking on ConcurrentHashMap
if there's anything we can use, but I don't think so. ConcurrentHashMap#replace
is close, but it doesn't account for unset/null
values... So, I think we have to go back to the AtomicBoolean
inside the map again, sorry about that. I think the other maps are OK like this.
var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.computeIfAbsent(projectId, k -> false); | ||
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(projectMetadata); | ||
|
||
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.computeIfAbsent(projectId, k -> false); | |
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(projectMetadata); | |
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) { | |
var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.getOrDefault(projectId, false); | |
if (atLeastOneGeoipProcessor == false && hasAtLeastOneGeoipProcessor(projectMetadata)) { |
and move the put
below to inside the if-block.
- doing
getOrDefault
instead ofcomputIfAbsent
avoids the redundant new value - performing the (relatively expensive check) as the second condition allows us to skip it if we already set this value to
true
once before (as it'll never get set tofalse
, we only remove the entry) - we don't need to
put
false
on every run
.projectState(projectId) | ||
.metadata() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.projectState(projectId) | |
.metadata() | |
.metadata() | |
.getProject(projectId) |
No need to convert to ProjectState
here.
@@ -287,4 +281,14 @@ private ClusterState clusterStateWithIndex(Consumer<Settings.Builder> consumer, | |||
.build(); | |||
return ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the clusterStateWithIndex
method, right?
Set<String> changed = new HashSet<>(); | ||
ProjectMetadata project = state.metadata().projects().get(projectId); | ||
ProjectMetadata previousProject = previousState.metadata().projects().get(projectId); | ||
if (previousProject != null && project != null) { | ||
changed.addAll(changedCustoms(project.customs(), previousProject.customs())); | ||
} else if (previousProject != null) { | ||
changed.addAll(previousProject.customs().keySet()); | ||
} else if (project != null) { | ||
changed.addAll(project.customs().keySet()); | ||
} | ||
return changed.contains(customMetadataType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still computes the entire Set
unnecessarily. What do you think about something like this:
Set<String> changed = new HashSet<>(); | |
ProjectMetadata project = state.metadata().projects().get(projectId); | |
ProjectMetadata previousProject = previousState.metadata().projects().get(projectId); | |
if (previousProject != null && project != null) { | |
changed.addAll(changedCustoms(project.customs(), previousProject.customs())); | |
} else if (previousProject != null) { | |
changed.addAll(previousProject.customs().keySet()); | |
} else if (project != null) { | |
changed.addAll(project.customs().keySet()); | |
} | |
return changed.contains(customMetadataType); | |
ProjectMetadata previousProject = previousState.metadata().projects().get(projectId); | |
ProjectMetadata project = state.metadata().projects().get(projectId); | |
Object previousValue = previousProject == null ? null : previousProject.customs().get(customMetadataType); | |
Object value = project == null ? null : project.customs().get(customMetadataType); | |
return Objects.equals(previousValue, value) == false; |
.putProjectMetadata(ProjectMetadata.builder(project2).build()) | ||
.build(); | ||
ClusterChangedEvent event = new ClusterChangedEvent("_na_", newState, originalState); | ||
assertTrue(event.customMetadataChanged(project2.id(), IndexGraveyard.TYPE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this IndexGraveyard
line a leftover, or did you put it there intentionally?
This change makes the GeoIp persistent task executor/downloader multi-project aware.
<project-id>/geoip-downloader
for cluster in MP modeTo keep the size of PR review friendly, this PR only focus on the downloading part of GeoIP database, there will be more changes coming in separate PRs to make GeoIP multi-project aware in general.