Django后台任务批量导出、批量导入

需求

  1. 将耗时任务进行后台处理,如数据导入(xls)、数据导出(csv)

实现过程

整体实现过程中,
数据导入主要使用pandas处理数据的每一行,需求为每次导出的数据都可能发生变化,需要进行判定当次数据是否为None,如果是,则不替换数据。
数据导出主要使用了django-import-export实现,通过继承ExportMixin并改写某些代码完成数据的提取

数据导入:

  1. 处理思路:
    将post上传的文件存储至数据库中,数据库记录文件位置,文件存放在media下的指定文件夹中
    创建现成任务进行后台处理,同时页面返回相应提示,并不再允许上传第二次任务(内存太小)
  2. 具体实现:
    views.py:
task_list = []
class ImportDataView(LoginRequiredMixin, View):
    "导入数据"
    model_instances = []

    def get(self, request):
        global task_list
        print(task_list)
        have_task = None
        if task_list:
            task_alive = task_list[0]
            have_task = True
            if not task_alive.is_alive():
                t = task_list.pop()
                t.join()

        form = forms.UploadFileForm
        return render(request, 'main_app/import_data.html', {'form': form,"have_task":have_task})

    def post(self, request):
        """处理文件上传"""
        form = forms.UploadFileForm(request.POST, request.FILES)
        if form.is_valid():
            filehandle = request.FILES['file']
            obj = models.ImportData(file=filehandle)
            obj.save()
            t = threading.Thread(target=tasks.parse_xls,args=[obj.id, True])
            t.start()
            task_list.append(t)

            messages.success(request, "上传成功")
            return redirect("phone:import")

        messages.success(request, "上传失败")
        return redirect("phone:import")

tasks.py:

def parse_xls(file_id, is_update):
    global model_instances
    print("数据入库")
    lob_obj = models.ImportData.objects.get(id=file_id)

    data = pandas.read_excel(lob_obj.file)
    write_import_status(lob_obj,"正在处理")

    if not is_update:
        # 新增,已废弃
        data.apply(lambda x: all2db(x), axis=1)
        models.MainTable.objects.bulk_create(model_instances, ignore_conflicts=True)
        model_instances = []
    else:
        # 更新
        data.apply(lambda x: file2db(x,lob_obj), axis=1)

    write_import_status(lob_obj,"是")

    print("数据入库完毕")

def file2db(row,log_obj):
    # print(row)
    try:
        obj, created = models.MainTable.objects.get_or_create(phone=row["号码"])
        for k, v in row.items():
            if not pandas.isna(v) and k != "号码":
                if v == "None":
                    v = None
                elif k in ["星级", "信用积分", "前三月平均消费"] and not isinstance(v, int):
                    v = None
                try:
                    obj.__setattr__(map_field[k], v)
                except Exception as e:
                    print(f"k:{k},v:{v},phone:{row['号码']},row")
                    write_import_log(log_obj, row, e)
                    return
        obj.save()
    except Exception as e:
        write_import_log(log_obj,row,e)
        return

def all2db(row):
    global model_instances
    obj = models.MainTable(
        phone=row["号码"],
    )
    model_instances.append(obj)
    return obj

数据导出,此处用django-import-export包进行导出

views.py

export_task_list = []
class ExportDataView2(LoginRequiredMixin, CustomFilter, ExportMixin, View):
    model = models.MainTable  # for ExportMixin
    to_encoding = "utf-8-sig"

    def get(self,request):
        # CSV, XLS, XLSX, TSV, ODS, JSON, YAML, HTML,
        # 0,   1,    2,    3,    4,   5,   6,    7
        global export_task_list
        file_format = {"xlsx":2, "csv":0}

        file_type = "csv"
        form = forms.SearchForm(self.request.GET)
        if form.is_valid() and not self.check_task_list():
            qs = models.MainTable.objects
            qs, is_filter = self.get_query_set(form.cleaned_data,qs)
            if is_filter:
                t = threading.Thread(target=tasks.make_export, args=[qs,file_format[file_type],request,form.cleaned_data])
                t.start()
                export_task_list.append(t)

                messages.success(request,"正在生成")
                return redirect("phone:index")

        msg = "未检测到过滤条件,不要以此方式导出全部数据"
        if self.check_task_list():
            msg = "当前正在进行任务导出,转至后台管理查看"
        messages.success(request,msg)
        return redirect("phone:index")

    def get_export_data(self, file_format, queryset, *args, **kwargs):
        """
        Returns file_format representation for given queryset.
        """
        request = kwargs.pop("request")
        if not self.has_export_permission(request):
            raise PermissionDenied

        data = admin.MainTableResource().export(queryset, *args, **kwargs)

        export_data = file_format.export_data(data)
        return export_data.encode(self.to_encoding)

    def check_task_list(self):
        global export_task_list
        have_task = False
        if export_task_list:
            task_alive = export_task_list[0]
            have_task = True
            if not task_alive.is_alive():
                t = export_task_list.pop()
                t.join()
                have_task = False
        return have_task

admin.py,用于重置导出文件的标题。ExportMixin会根据model找到ModelResource:

class MainTableResource(resources.ModelResource):
    phone = Field(attribute='phone', column_name='号码')
    package_name = Field(attribute='package_name', column_name='套餐名称')
    is_black = Field(attribute='is_black', column_name='是否黑名单')


    class Meta:
        model = models.MainTable

        exclude = ["id",]

task.py:

def make_export(dataset,file_type_num,request,forms):
    search_log = format_search_log(forms)

    export_models = models.ExportData(**{"file": None,"log":search_log})
    export_models.save()


    export_obj = views.ExportDataView2()

    formats = export_obj.get_export_formats()
    target_file_format = formats[file_type_num]()
    target_data = export_obj.get_export_data(target_file_format, dataset, request=request)  # 耗时

    file_obj = File(BytesIO(target_data), name=export_obj.get_export_filename(request, dataset, target_file_format))

    export_models.file = file_obj
    export_models.save()
原文地址:https://www.cnblogs.com/lisicn/p/15397535.html