app/api/helpers/csv_jobs_util.py
import pytz
from app.api.admin_sales.utils import event_type, summary
from app.api.helpers.group_user_role import get_user_group_role
from app.models import db
from app.models.access_code import AccessCode
from app.models.helpers.versioning import strip_tags
from app.models.order import OrderTicket
from app.models.ticket import access_codes_tickets
from app.models.user_check_in import VirtualCheckIn
def export_orders_csv(orders):
headers = [
'Order#',
'Order Date',
'Status',
'Payment Type',
'Payment Mode',
'Total Amount',
'Quantity',
'Discount Code',
'Access Code',
'First Name',
'Last Name',
'Email',
'Tax ID',
'Address',
'Company',
'Country',
'State',
'City',
'Zipcode',
]
rows = [headers]
for order in orders:
if order.status != "deleted":
orderTickets = db.session.query(OrderTicket).filter_by(order_id=order.id)
access_code_value = ''
for order_ticket in orderTickets:
accessCodesTicket = (
db.session.query(access_codes_tickets)
.filter(access_codes_tickets.c.ticket_id == order_ticket.ticket_id)
.first()
)
if accessCodesTicket:
access_code = (
db.session.query(AccessCode)
.filter_by(id=accessCodesTicket.access_code_id)
.first()
)
if access_code:
access_code_value = access_code.code
break
column = [
str(order.get_invoice_number()),
str(order.created_at) if order.created_at else '',
str(order.status) if order.status else '',
str(order.paid_via) if order.paid_via else '',
str(order.payment_mode) if order.payment_mode else '',
str(order.amount) if order.amount else '',
str(order.tickets_count),
str(order.discount_code.code) if order.discount_code else '',
str(access_code_value),
str(order.user.first_name)
if order.user and order.user.first_name
else '',
str(order.user.last_name) if order.user and order.user.last_name else '',
str(order.user.email) if order.user and order.user.email else '',
str(order.tax_business_info) if order.tax_business_info else '',
str(order.address) if order.address else '',
str(order.company) if order.company else '',
str(order.country) if order.country else '',
str(order.state) if order.state else '',
str(order.city) if order.city else '',
str(order.zipcode) if order.zipcode else '',
]
rows.append(column)
return rows
def get_order_ticket_data(attendee, order, ticket):
"""Get order ticket data"""
data = {}
if not order:
return {
'Order#': '',
'Order Date': '',
'Status': '',
'Payment Type': '',
'Payment Mode': '',
'Ticket ID': '',
'Ticket Name': '',
'Ticket Price': '0',
'Ticket Type': '',
'Tax ID': '',
'Address': '',
'Company': '',
'Country': '',
'State': '',
'City': '',
'Zipcode': '',
}
ticketId = ''
if order.identifier:
if not attendee.identifier:
ticketId = order.identifier + '-' + str(attendee.id)
else:
ticketId = order.identifier + '-' + attendee.identifier
data = {
'Order#': str(order.get_invoice_number()),
'Order Date': str(order.created_at.strftime('%B %-d, %Y %H:%M %z'))
if order.created_at
else '-',
'Status': str(order.status) if order.status else '-',
'Payment Type': str(order.paid_via) if order.paid_via else '',
'Payment Mode': str(order.payment_mode) if order.payment_mode else '',
'Ticket ID': ticketId,
}
if ticket:
data.update(get_ticket_data(ticket))
data.update(get_order_data(order))
return data
def get_order_data(order):
"""Get order data from order object"""
if not order:
return {
'Tax ID': '',
'Address': '',
'Company': '',
'Country': '',
'State': '',
'City': '',
'Zipcode': '',
}
return {
'Tax ID': str(order.tax_business_info) if order.tax_business_info else '',
'Address': str(order.address) if order.address else '',
'Company': str(order.company) if order.company else '',
'Country': str(order.country) if order.country else '',
'State': str(order.state) if order.state else '',
'City': str(order.city) if order.city else '',
'Zipcode': str(order.zipcode) if order.zipcode else '',
}
def get_ticket_data(ticket):
"""Get ticket data from ticket object"""
if not ticket:
return {'Ticket Name': '', 'Ticket Price': '0', 'Ticket Type': ''}
return {
'Ticket Name': str(ticket.name) if ticket.name else '',
'Ticket Price': str(ticket.price) if ticket.price else '0',
'Ticket Type': str(ticket.type) if ticket.type else '',
}
def get_attendee_data(attendee, custom_forms, attendee_form_dict):
"""Get attendee data from attendee object"""
order_ticket_data = get_order_ticket_data(attendee, attendee.order, attendee.ticket)
data = {
**order_ticket_data,
'Email': '',
}
for field in custom_forms:
key_mapping = {k.replace("_", "").lower(): k for k in attendee_form_dict.keys()}
field_raw = field.identifier.replace("_", "").lower()
key = key_mapping.get(field_raw)
converted_header = attendee_form_dict.get(key)
if field.is_complex:
fields_dict = attendee.complex_field_values
converted_header = field.name
data[converted_header] = (
fields_dict.get(field.identifier, '') if fields_dict else ''
)
else:
dict_value = getattr(attendee, field.identifier, '')
dict_value = (
"Yes"
if str(dict_value) == "True"
else "No"
if str(dict_value) == "False"
else dict_value
)
converted_header = field.name
data[converted_header] = dict_value
data['virtual_event_checkin_times'] = get_virtual_checkin_times(attendee.id)
return data
def export_attendees_csv(attendees, custom_forms, attendee_form_dict):
"""Export attendees csv"""
return_dict_list = []
for attendee in attendees:
attendee_data = get_attendee_data(attendee, custom_forms, attendee_form_dict)
if attendee_data:
return_dict_list.append(attendee_data)
return return_dict_list
def get_virtual_checkin_times(attendee_id: int):
"""
get check in times of attendee
@param attendee_id: attendee_id
@return: time check in of attendee
"""
virtual_check_in = VirtualCheckIn.query.filter(
VirtualCheckIn.ticket_holder_id.any(attendee_id),
VirtualCheckIn.check_in_type == 'room',
).all()
virtual_check_in_times = [
item.check_in_at.strftime("%Y-%m-%dT%H:%M:%S%z") for item in virtual_check_in
]
return virtual_check_in_times
def export_sessions_csv(sessions):
headers = [
'Session Title',
'Session Starts At',
'Session Ends At',
'Session Speakers',
'Speaker Emails',
'Session Track',
'Session Abstract Short',
'Session Abstract Long',
'Comment',
'Created At',
'Email Sent',
'Level',
'Status',
'Session Type',
'Talk Length',
'Language',
'Slides',
'Audio',
'Video',
'Average Rating',
'Number of Ratings',
]
rows = [headers]
for session in sessions:
if not session.deleted_at:
column = [session.title + ' (' + session.state + ')' if session.title else '']
column.append(
session.starts_at.astimezone(
pytz.timezone(session.event.timezone)
).strftime('%B %-d, %Y %H:%M %z')
if session.starts_at
else ''
)
column.append(
session.ends_at.astimezone(
pytz.timezone(session.event.timezone)
).strftime('%B %-d, %Y %H:%M %z')
if session.ends_at
else ''
)
column.append(
'; '.join(
list(filter(bool, map(lambda sp: sp.name, session.speakers or [])))
)
)
column.append(
'; '.join(
list(filter(bool, map(lambda sp: sp.email, session.speakers or [])))
)
)
column.append(
session.track.name if session.track and session.track.name else ''
)
column.append(
strip_tags(session.short_abstract) if session.short_abstract else ''
)
column.append(
strip_tags(session.long_abstract) if session.long_abstract else ''
)
column.append(strip_tags(session.comments) if session.comments else '')
column.append(
session.created_at.strftime('%B %-d, %Y %H:%M %z')
if session.created_at
else ''
)
column.append('Yes' if session.is_mail_sent else 'No')
column.append(session.level)
column.append(session.state)
column.append(
session.session_type.name
if session.session_type and session.session_type.name
else ''
)
column.append(
session.session_type.length
if session.session_type and session.session_type.length
else ''
)
column.append(session.language if session.language else '')
column.append(session.slides_url if session.slides_url else '')
column.append(session.audio_url if session.audio_url else '')
column.append(session.video_url if session.video_url else '')
column.append(session.average_rating)
column.append(session.rating_count)
rows.append(column)
return rows
def export_sales_csv(sales):
headers = [
'Event Name',
'Owner Name',
'Owner Email',
'Event Type',
'Event Date',
'Ticket (Completed)',
'Sales (Completed)',
'Ticket (Placed)',
'Sales (Placed)',
'Ticket (Pending)',
'Sales (Pending)',
]
rows = [headers]
for sale in sales:
if not sale.deleted_at:
column = [sale.name]
column.append(sale.owner.first_name if sale.owner.first_name else '')
column.append(sale.owner.email)
column.append(event_type(sale))
column.append(
sale.starts_at.astimezone(pytz.timezone(sale.timezone)).strftime(
'%B %-d, %Y %H:%M %z'
)
if sale.starts_at
else ''
)
column.append(summary(sale)['completed']['ticket_count'])
column.append(summary(sale)['completed']['sales_total'])
column.append(summary(sale)['placed']['ticket_count'])
column.append(summary(sale)['placed']['sales_total'])
column.append(summary(sale)['pending']['ticket_count'])
column.append(summary(sale)['pending']['sales_total'])
rows.append(column)
return rows
def export_speakers_csv(speakers):
headers = [
'Speaker Name',
'Speaker Email',
'Speaker Session(s)',
'Speaker Mobile',
'Speaker Bio',
'Speaker Organisation',
'Speaker Position',
'Speaker Experience',
'Speaker Sponsorship Required',
'Speaker City',
'Speaker Country',
'Speaker Website',
'Speaker Twitter',
'Speaker Facebook',
'Speaker Github',
'Speaker LinkedIn',
]
rows = [headers]
for speaker in speakers:
column = [
speaker.name if speaker.name else '',
speaker.email if speaker.email else '',
]
if speaker.sessions:
session_details = ''
for session in speaker.sessions:
if not session.deleted_at:
session_details += session.title + ' (' + session.state + '); '
column.append(session_details[:-2])
else:
column.append('')
column.append(speaker.mobile if speaker.mobile else '')
column.append(speaker.short_biography if speaker.short_biography else '')
column.append(speaker.organisation if speaker.organisation else '')
column.append(speaker.position if speaker.position else '')
column.append(speaker.speaking_experience if speaker.speaking_experience else '')
column.append(
speaker.sponsorship_required if speaker.sponsorship_required else ''
)
column.append(speaker.city if speaker.city else '')
column.append(speaker.country if speaker.country else '')
column.append(speaker.website if speaker.website else '')
column.append(speaker.twitter if speaker.twitter else '')
column.append(speaker.facebook if speaker.facebook else '')
column.append(speaker.github if speaker.github else '')
column.append(speaker.linkedin if speaker.linkedin else '')
rows.append(column)
return rows
def export_group_followers_csv(followers):
headers = [
'Public Profile Name',
'Email',
'Group Join Date',
'Role (Owner, Organizer, Follower)',
]
rows = [headers]
for follower in followers:
column = [follower.user.public_name if follower.user.public_name else '']
column.append(follower.user._email if follower.user._email else '')
column.append(
follower.created_at.strftime('%B %-d, %Y %H:%M %z')
if follower.created_at
else ''
)
column.append(get_user_group_role(follower.user_id, follower.group_id))
rows.append(column)
return rows