zip~~~~~~~~~~~

 Mono.zip(memberList, batchClient.getGroupDetail(toGetDetailsJson))
                            .doOnNext(zipData -> {
                                String memberlist = zipData.getT1();
                                JSONObject details = zipData.getT2();
                                JSONObject groupChatObj = details.getJSONObject("group_chat");
                                System.out.println("status:" + status);
                                saveOrUpdateGroups(
                                        Groups.buildGroupsByDetailsInfo(groupChatObj, groupStateStr, status, batchId, memberlist)
                                );
                            }).subscribe();
        Mono<GroupsMemberList> groupsMemberListByChatId = groupsMemberListRepository.findGroupsMemberListByChatId(groups.getChatId());
        return Mono.zip(ownerName, groupsMemberListByChatId).flatMap(
                tuple -> {
                    String ownername = tuple.getT1();
                    log.info("ownername::::::::::{}", ownername);
                    GroupsMemberList groupsMemberList = tuple.getT2();
                    groups.setLastMemberList(groupsMemberList.getLastMemberList());
                    groups.setGroupMaster(ownername);
                    return externalGroupsRepository.save(groups)
                            .doOnSubscribe(s -> System.out.print("客户群保存或更新" + groups.getChatId()))
                            .doOnNext(groupz-> System.out.println(groupz))
                            .onErrorContinue((error, group) -> {
                        String localizedMessage = error.getLocalizedMessage();
                        if (localizedMessage.startsWith("Failed to update table")) {
                            log.warn("更新客户群失败,执行新增:{}", groups.getChatId());
                            //因为是新数据,所以lastmemberlist填充一样的
                            groups.setLastMemberList(groups.getMemberList());
                            externalGroupsRepository.save(groups.setAsNew()).retry(1).onErrorResume(insertError ->
                            {
                                log.error("更新后失败执行新增后也异常了:{},chatId:{}", insertError, groups.getChatId());
                                return Mono.error(insertError);
                            }).subscribe();
                        }
                    }).retry(1).switchIfEmpty(Mono.empty());
                }
        );
原文地址:https://www.cnblogs.com/ukzq/p/13885489.html