The answer is to not use /opt/keycloak/bin/kc.sh import at all. Instead use partial import api route:
https://www.keycloak.org/docs-api/22.0.1/rest-api/index.html#:~:text=POST%20/admin/realms/{realm}/partialImport
You’re welcome
def post_users_keycloak_multi_threaded(realm, user_files):
num_threads = 10
file_batches = numpy.array_split(numpy.array(user_files),num_threads)
threads = []
for file_batch in file_batches:
thread = threading.Thread(target=post_users_in_thread, args=(realm, file_batch.tolist()))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
def post_users_in_thread(realm, file_batch):
while file_batch: # Continue processing as long as there are files left
for file in file_batch.copy(): # Iterate over a copy of the batch to safely modify it
success = False
attempts = 0
while not success and attempts < 10: # Retry up to 10 times for each file
try:
access_token = get_access_token()
response = post_users(file, realm, access_token)
if response.status_code == 200:
print(f"Import successful: {file}")
file_batch.remove(file) # Remove from the batch on success
success = True # Set success to True to end retry loop
else:
logger.error(f"Error on {file}: {response.status_code}, {response.text}...retrying")
attempts += 1
time.sleep(2 ** attempts) # Exponential backoff strategy
except requests.exceptions.RequestException as e:
# Handle specific request exceptions (e.g. timeout, connection error)
logger.error(f"Request failed for {file}: {e}...retrying")
attempts += 1
time.sleep(2 ** attempts) # Exponential backoff strategy
if attempts >= 10:
# Log final failure after max attempts
logger.error(f"Failed to process {file} after {attempts} attempts.")
def post_users(users_file, realm, access_token):
host = os.getenv('KC_HOST')
with open(users_file, "r") as f:
import_data = json.load(f)
url = f"{host}/admin/realms/{realm}/partialImport"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {access_token}"}
response = requests.post(url, headers=headers, json=import_data)
return response
def get_access_token():
host = os.getenv('KC_HOST')
url = f"{host}/realms/master/protocol/openid-connect/token"
client_id = os.getenv('CLIENT_ID')
client_secret = os.getenv('CLIENT_SECRET')
grant_type = "client_credentials"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": grant_type,
"client_id": client_id,
"client_secret": client_secret,
}
access_token_response = requests.post(url, headers=headers, data=data)
response = access_token_response.json()
access_token = response.get("access_token")
return access_token