def replace_genomes(input_file, next_barcode_fn, output_file):
"""
Replaces genomes in infected individuals and vectors.
Args:
input_file (str): Input serialized population file
next_barcode_fn (Callable): Function that return the next barcode. The function is called once for every infection of an
individual and once for every vector in the vector population.
output_file (str): Output file with replaced genomes.
"""
if not os.path.exists(input_file):
raise Exception(f"Couldn't find specified input file: {input_file}.")
if next_barcode_fn is None:
raise Exception("You must provide a function that returns the next barcode string")
pop = SerPop.SerializedPopulation(input_file)
ser_pop_genome_map = pop.dtk.simulation["ParasiteGenetics"]["m_ParasiteGenomeMap"]
ser_pop_genome_map.clear()
cache_genome_map = {}
for node in pop.nodes:
# tic1 = time.perf_counter()
for person in node["individualHumans"]:
# print("------------ " + str(person["suid"]["id"]) + " -----------------")
for infection in person["infections"]:
next_genome = get_next_genome(next_barcode_fn, person["suid"]["id"], ser_pop_genome_map, cache_genome_map)
length_barcode = len(infection["infection_strain"]["m_Genome"]["m_pInner"]["m_NucleotideSequence"])
assert length_barcode == len(next_genome["m_pInner"]["m_NucleotideSequence"]), "New barcode has wrong length."
infection["infection_strain"]["m_Genome"] = next_genome
# tic2 = time.perf_counter()
# print(f"{tic2 - tic1:0.4f}")
for vector_pop in node["m_vectorpopulations"]:
# print(len(vector_pop["AdultQueues"]))
# tic1 = time.perf_counter()
for vector in vector_pop["AdultQueues"]["collection"]:
# print("------------ VECTOR " + str(vector["m_ID"]) + " -----------------")
for oocyst in vector["m_OocystCohorts"]:
genome_oocyst = get_next_genome(next_barcode_fn, -999, ser_pop_genome_map, cache_genome_map)
length_oocyst_barcode = len(oocyst["m_MaleGametocyteGenome"]["m_pInner"]["m_NucleotideSequence"])
assert len(genome_oocyst["m_pInner"]["m_NucleotideSequence"]) == length_oocyst_barcode, "New barcode has wrong length."
oocyst["m_MaleGametocyteGenome"] = genome_oocyst
genome_oocyst = get_next_genome(next_barcode_fn, -999, ser_pop_genome_map, cache_genome_map)
length_oocyst_barcode = len(oocyst["m_pStrainIdentity"]["m_Genome"]["m_pInner"]["m_NucleotideSequence"])
assert len(genome_oocyst["m_pInner"]["m_NucleotideSequence"]) == length_oocyst_barcode, "New barcode has wrong length."
oocyst["m_pStrainIdentity"]["m_Genome"] = genome_oocyst
for sporo in vector["m_SporozoiteCohorts"]:
genome_sporo = get_next_genome(next_barcode_fn, -999, ser_pop_genome_map, cache_genome_map)
length_sporo_barcode = len(sporo["m_MaleGametocyteGenome"]["m_pInner"]["m_NucleotideSequence"])
assert len(genome_sporo["m_pInner"]["m_NucleotideSequence"]) == length_sporo_barcode, "New barcode has wrong length."
sporo["m_MaleGametocyteGenome"] = genome_sporo
genome_sporo = get_next_genome(next_barcode_fn, -999, ser_pop_genome_map, cache_genome_map)
length_sporo_barcode = len(sporo["m_pStrainIdentity"]["m_Genome"]["m_pInner"]["m_NucleotideSequence"])
assert len(genome_sporo["m_pInner"]["m_NucleotideSequence"]) == length_sporo_barcode, "New barcode has wrong length."
sporo["m_pStrainIdentity"]["m_Genome"] = genome_sporo
# tic2 = time.perf_counter()
# print(f"{tic2 - tic1:0.4f}")
pop.write(output_file)