import logging import time import threading import copy from fedora_messaging import config import asyncio from . import beaker from . import exceptions as exc from . import log from . import conf_test_cases from .settings import Settings #config.conf.setup_logging() import os import re import requests logger = log.get_logger(__name__) def _find_true_body(message): """Currently the ZMQ->AMQP bridge produces a message with the entire fedmsg as the 'body'. When the publisher is converted to AMQP it will likely only include the 'msg' dict as the 'body'. So let's try and make sure we work either way... https://github.com/fedora-infra/fedmsg-migration-tools/issues/20 """ body = message.body if 'msg' in body and 'msg_id' in body: # OK, pretty sure this is a translated fedmsg, take 'msg' body = body['msg'] return body def consume_message(message): body = _find_true_body(message) location = body.get('location') status = body.get('status') compstr = body.get('compose_id', location) release_arch = config.conf["consumer_config"]["release_arch"] do_report = config.conf["consumer_config"]["do_report"] wiki_hostname = config.conf["consumer_config"]["wiki_hostname"] recent_release = config.conf["consumer_config"]["recent_release"] if 'Fedora-Modular' in compstr: logger.info("Not scheduling jobs for modular compose %s", compstr) return [] if 'Fedora-Cloud' in compstr: logger.info("Not scheduling jobs for cloud compose %s", compstr) return [] if 'Fedora-Epel' in compstr: logger.info("Not scheduling jobs for Epel compose %s", compstr) return [] if 'Fedora-IoT' in compstr: logger.info("Not scheduling jobs for IoT compose %s", compstr) return [] if 'updates-testing' in compstr: logger.info("Not scheduling jobs for updates-testing %s", compstr) return [] if 'updates' in compstr: logger.info("Not scheduling jobs for updates %s", compstr) return [] if 'Fedora-Container' in compstr: logger.info("Not scheduling jobs for Fedora-Container %s", compstr) return [] if 'FINISHED' in status and location: if 'n' in body["compose_id"]: beaker_distro = body["compose_id"] else: beaker_distro = 'Fedora-' + str(int(recent_release) + 1) data = { "cpu-arch": release_arch, "beaker-distro": body["compose_id"], "system-type": "baremetal", "do_report": do_report, "wiki_hostname": wiki_hostname, "recent_release": recent_release, "compose_id": body["compose_id"] } return data def populate_data(data): ks_list = conf_test_cases.Ks_List data_list = [] arch = data["cpu-arch"] release_str = re.split('-', data["beaker-distro"])[1] compose_name = re.split('-', data["beaker-distro"])[2] if release_str == 'Rawhide': release_number = int(data["recent_release"]) + 1 base_url = "https://kojipkgs.fedoraproject.org/compose/rawhide/" download_url = os.path.join(base_url, data["beaker-distro"], "compose/Server", data["cpu-arch"], "iso", "Fedora-Server-dvd-%s-%s-%s.iso")%(arch, release_str, compose_name) else: release_number = release_str base_url = "https://kojipkgs.fedoraproject.org/compose/branched/" download_url = os.path.join(base_url, data["beaker-distro"], "compose/Server", data["cpu-arch"], "iso", "Fedora-Server-dvd-%s-%s-%s.iso")%(arch, release_str, compose_name) for ks_data in ks_list: (ts_name , params), = ks_data.items() temp = copy.deepcopy(data) temp["ts_name"] = ts_name for key, value in params.items(): temp[key] = value if ts_name == "QA:Testcase_Install_to_Previous_KVM": previous = int(release_number) - 1 temp['beaker-distro'] = 'Fedora-' + str(previous) temp['real-distro'] = data['beaker-distro'] temp['beaker-family'] = 'Fedora' + str(previous) temp['ks_append'] = """ %%post wget %s -P /var/lib/libvirt/images/ %%end """%download_url if ts_name == "QA:Testcase_Install_to_Current_KVM" or ts_name == "QA:Testcase_install_to_VirtIO" or ts_name =="QA:Testcase_partitioning_custom_software_RAID": temp['ks_append'] = """ %%post wget %s -P /var/lib/libvirt/images/ %%end """%download_url if ts_name == "QA:Testcase_upgrade_dnf_previous_server": previous = int(release_number) - 2 temp['beaker-distro'] = 'Fedora-' + str(previous) temp['real-distro'] = data['beaker-distro'] temp['beaker-family'] = 'Fedora' + str(previous) temp['ks_append'] = """ %%post dnf install restraint-rhts -y echo %s > /root/release %%end """%release_number if ts_name == "QA:Testcase_upgrade_dnf_current_server": current = int(release_number) - 1 temp['beaker-distro'] = 'Fedora-' + str(current) temp['real-distro'] = data['beaker-distro'] temp['beaker-family'] = 'Fedora' + str(current) temp['ks_append'] = """ %%post dnf install restraint-rhts -y echo %s > /root/release %%end """%release_number if ts_name == "QA:Testcase_upgrade_dnf_previous_workstation": previous = int(release_number) - 2 temp['beaker-distro'] = 'Fedora-' + str(previous) temp['real-distro'] = data['beaker-distro'] temp['ks_append'] = """ %%packages @^workstation-product-environment %%end %%post dnf install restraint-rhts -y echo %s > /root/release systemctl enable sshd %%end """%release_number if ts_name == "QA:Testcase_upgrade_dnf_current_workstation": current = int(release_number) - 1 temp['beaker-distro'] = 'Fedora-' + str(current) temp['real-distro'] = data['beaker-distro'] temp['ks_append'] = """ %%packages @^workstation-product-environment %%end %%post systemctl enable sshd dnf install restraint-rhts -y echo %s > /root/release %%end """%release_number if ts_name == "QA:Testcase_upgrade_dnf_previous_minimal": previous = int(release_number) - 2 temp['beaker-distro'] = 'Fedora-' + str(previous) temp['real-distro'] = data['beaker-distro'] temp['ks_append'] = """ %%packages @^minimal-environment %%end %%post systemctl enable sshd dnf install restraint-rhts -y echo %s > /root/release %%end """%release_number if ts_name == "QA:Testcase_upgrade_dnf_current_minimal": current = int(release_number) - 1 temp['beaker-distro'] = 'Fedora-' + str(current) temp['real-distro'] = data['beaker-distro'] temp['ks_append'] = """ %%packages @^minimal-environment %%end %%post systemctl enable sshd dnf install restraint-rhts -y echo %s > /root/release %%end """%release_number data_list.append(temp) return data_list async def process_data(data): data_list = populate_data(data) tasks = [beaker.process(data) for data in data_list] await asyncio.gather(*tasks) class Consumer: def __init__(self): """ Initialize the consumer, sets up processor. """ pass def __call__(self, message): try: logger.info(message) data = consume_message(message) if data: #We have to give beaker some time to sync the repo time.sleep(10800) if 'Rawhide' in data["beaker-distro"]: base_http_url = "http://download-01.eng.brq.redhat.com/released/fedora/rawhide" else: base_http_url = "http://download-01.eng.brq.redhat.com/released/fedora/branched" http_url = os.path.join(base_http_url, data["beaker-distro"], "Server", data["cpu-arch"], "os") #recently beaker syncing job seems take longer time logger.info("http_url: %s"%http_url) while requests.get(http_url).status_code != 200: logger.info("http_url: %s"%http_url) time.sleep(600) logger.info("checking") asyncio.run(process_data(data)) except Exception as e: logger.error("consumer failed: %s"%e)