/*
** Copyright (C) 2001-2026 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "reporter.h"
#include "report_protocol.h"

#include "../db_lengths_constants.h"

#include "zbxtimekeeper.h"
#include "zbxthreads.h"
#include "zbxalerter.h"
#include "zbxcrypto.h"
#include "zbxself.h"
#include "zbxnix.h"
#include "zbxnum.h"
#include "zbxtime.h"
#include "zbxlog.h"
#include "zbxalgo.h"
#include "zbxcacheconfig.h"
#include "zbxdb.h"
#include "zbxdbhigh.h"
#include "zbxexpr.h"
#include "zbxipcservice.h"
#include "zbxjson.h"
#include "zbxstr.h"
#include "zbx_expression_constants.h"

#define ZBX_REPORT_STATUS_ENABLED	0
#define ZBX_REPORT_STATUS_DISABLED	1

#define ZBX_REPORT_UPDATE_LASTSENT	0x0001
#define ZBX_REPORT_UPDATE_STATE		0x0002
#define ZBX_REPORT_UPDATE_ERROR		0x0004
#define ZBX_REPORT_UPDATE		(ZBX_REPORT_UPDATE_LASTSENT | ZBX_REPORT_UPDATE_STATE | ZBX_REPORT_UPDATE_ERROR)

#define ZBX_REPORT_STATE_SUCCESS	1
#define ZBX_REPORT_STATE_ERROR		2
#define ZBX_REPORT_STATE_SUCCESS_INFO	3

/* report manager data */
typedef struct
{
	/* config.url, config.session_key fields synced from database together with reports */
	char			*zabbix_url;
	char			*session_key;

	/* the IPC service */
	zbx_ipc_service_t	ipc;

	/* the next writer index to be assigned to new IPC service clients */
	int			next_writer_index;

	/* identifier of the last batchid (autogenerated) */
	zbx_uint64_t		last_batchid;

	/* report writer vector, created during manager initialization */
	zbx_vector_ptr_t	writers;
	zbx_queue_ptr_t		free_writers;

	zbx_hashset_t		reports;
	zbx_hashset_t		batches;

	zbx_vector_uint64_t	flush_queue;

	zbx_binary_heap_t	report_queue;

	zbx_list_t		job_queue;
}
zbx_rm_t;

typedef struct
{
	zbx_uint64_t	id;
	zbx_uint64_t	access_userid;
}
zbx_rm_recipient_t;

ZBX_VECTOR_DECL(recipient, zbx_rm_recipient_t)
ZBX_VECTOR_IMPL(recipient, zbx_rm_recipient_t)

typedef struct
{
	zbx_uint64_t		reportid;
	zbx_uint64_t		userid;
	zbx_uint64_t		dashboardid;
	char			*name;
	char			*timezone;
	char			*error;
	unsigned char		period;
	unsigned char		cycle;
	unsigned char		weekdays;
	unsigned char		status;
	int			start_time;
	int			state;
	zbx_uint32_t		flags;
	int			nextcheck;
	int			active_since;
	int			active_till;
	int			lastsent;

	zbx_vector_ptr_pair_t	params;
	zbx_vector_recipient_t	usergroups;
	zbx_vector_recipient_t	users;
	zbx_vector_uint64_t	users_excl;
}
zbx_rm_report_t;

typedef struct
{
	zbx_uint64_t	userid;
	char		*sid;
	char		*cookie;
}
zbx_rm_session_t;

typedef struct
{
	zbx_uint64_t		access_userid;
	zbx_uint64_t		batchid;
	char			*url;
	char			*report_name;
	unsigned char		is_test_report;
	zbx_vector_uint64_t	userids;
	zbx_vector_ptr_pair_t	params;
	zbx_rm_session_t	*session;

	zbx_ipc_client_t	*client;
}
zbx_rm_job_t;

typedef struct
{
	zbx_uint64_t		batchid;
	zbx_uint64_t		reportid;
	int			error_num;
	int			sent_num;
	int			total_num;
	char			*info;
	size_t			info_alloc;
	size_t			info_offset;
	zbx_vector_ptr_t	jobs;
}
zbx_rm_batch_t;

typedef struct
{
	/* the connected report writer client */
	zbx_ipc_client_t	*client;

	zbx_rm_job_t		*job;
}
zbx_rm_writer_t;

/******************************************************************************
 *                                                                            *
 * Purpose: returns writer with specified client                              *
 *                                                                            *
 ******************************************************************************/
static	zbx_rm_writer_t	*rm_get_writer(zbx_rm_t *manager, const zbx_ipc_client_t *client)
{
	for (int i = 0; i < manager->writers.values_num; i++)
	{
		zbx_rm_writer_t	*writer = (zbx_rm_writer_t *)manager->writers.values[i];

		if (writer->client == client)
			return writer;
	}

	return NULL;
}

static int	rm_report_compare_nextcheck(const void *d1, const void *d2)
{
	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;

	return ((zbx_rm_report_t *)e1->data)->nextcheck - ((zbx_rm_report_t *)e2->data)->nextcheck;
}

static void	rm_report_clean(zbx_rm_report_t *report)
{
	zbx_free(report->name);
	zbx_free(report->timezone);
	zbx_free(report->error);

	report_destroy_params(&report->params);

	zbx_vector_recipient_destroy(&report->usergroups);
	zbx_vector_recipient_destroy(&report->users);
	zbx_vector_uint64_destroy(&report->users_excl);
}

/******************************************************************************
 *                                                                            *
 * Purpose: closes session                                                    *
 *                                                                            *
 * Parameters: session - [IN]                                                 *
 *                                                                            *
 * Comments: The session is deleted from the database to ensure that it is    *
 *           impossible to authenticate a user with it in Zabbix frontend.    *
 *                                                                            *
 ******************************************************************************/
static void	rm_session_close(zbx_rm_session_t *session)
{
	if (ZBX_DB_OK > zbx_db_execute("delete from sessions where sessionid='%s'", session->sid))
	{
		zabbix_log(LOG_LEVEL_WARNING, "Report manager failed to delete web session from database, user ID = "
				ZBX_FS_UI64 ".", session->userid);
	}

	zbx_free(session->sid);
	zbx_free(session->cookie);
	zbx_free(session);
}

static void	rm_job_free(void *ptr)
{
	zbx_rm_job_t	*job = (zbx_rm_job_t *)ptr;

	if (NULL != job->client)
		zbx_ipc_client_release(job->client);

	zbx_free(job->report_name);
	zbx_free(job->url);

	rm_session_close(job->session);

	zbx_vector_uint64_destroy(&job->userids);
	report_destroy_params(&job->params);

	zbx_free(job);
}

static void	rm_batch_clean(zbx_rm_batch_t *batch)
{
	zbx_vector_ptr_clear_ext(&batch->jobs, rm_job_free);
	zbx_vector_ptr_destroy(&batch->jobs);
	zbx_free(batch->info);
}

/******************************************************************************
 *                                                                            *
 * Purpose: initializes report manager                                        *
 *                                                                            *
 * Parameters: manager             - [IN] manager to initialize               *
 *             get_config_forks_cb - [IN]                                     *
 *             error               - [OUT]                                    *
 *                                                                            *
 ******************************************************************************/
static int	rm_init(zbx_rm_t *manager, zbx_get_config_forks_f get_config_forks_cb, char **error)
{
	int		ret;
	zbx_rm_writer_t	*writer;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() writers:%d", __func__, get_config_forks_cb(ZBX_PROCESS_TYPE_REPORTWRITER));

	if (FAIL == (ret = zbx_ipc_service_start(&manager->ipc, ZBX_IPC_SERVICE_REPORTER, error)))
		goto out;

	zbx_vector_ptr_create(&manager->writers);
	zbx_queue_ptr_create(&manager->free_writers);
	zbx_hashset_create(&manager->reports, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_hashset_create(&manager->batches, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_binary_heap_create(&manager->report_queue, rm_report_compare_nextcheck, ZBX_BINARY_HEAP_OPTION_DIRECT);

	zbx_vector_uint64_create(&manager->flush_queue);

	zbx_list_create(&manager->job_queue);

	manager->next_writer_index = 0;
	manager->session_key = NULL;
	manager->zabbix_url = NULL;
	manager->last_batchid = 0;

	for (int i = 0; i < get_config_forks_cb(ZBX_PROCESS_TYPE_REPORTWRITER); i++)
	{
		writer = (zbx_rm_writer_t *)zbx_malloc(NULL, sizeof(zbx_rm_writer_t));
		writer->client = NULL;
		zbx_vector_ptr_append(&manager->writers, writer);
	}
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: registers report writer                                           *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             client  - [IN] connected writer                                *
 *             message - [IN] received message                                *
 *                                                                            *
 ******************************************************************************/
static void	rm_register_writer(zbx_rm_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message)
{
	zbx_rm_writer_t	*writer = NULL;
	pid_t		ppid;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	memcpy(&ppid, message->data, sizeof(ppid));

	if (ppid != getppid())
	{
		zbx_ipc_client_close(client);
		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
	}
	else
	{
		if (manager->next_writer_index == manager->writers.values_num)
		{
			THIS_SHOULD_NEVER_HAPPEN;
			exit(EXIT_FAILURE);
		}

		writer = (zbx_rm_writer_t *)manager->writers.values[manager->next_writer_index++];
		writer->client = client;

		zbx_queue_ptr_push(&manager->free_writers, writer);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: converts timestamp to range format used in URL query fields       *
 *                                                                            *
 * Parameters: tm - [IN] timestamp                                            *
 *                                                                            *
 * Return value: formatted time to be used in URL query fields                *
 *                                                                            *
 ******************************************************************************/
static char	*rm_time_to_urlfield(const struct tm *tm)
{
	static char	buf[26];

	zbx_snprintf(buf, sizeof(buf), "%02d-%02d-%02d%%20%02d%%3A%02d%%3A%02d", tm->tm_year + 1900, tm->tm_mon + 1,
			tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec);

	return buf;
}

/******************************************************************************
 *                                                                            *
 * Purpose: creates zbx_session cookie for frontend authentication            *
 *                                                                            *
 * Parameters: manager   - [IN]                                               *
 *             sessionid - [IN]                                               *
 *                                                                            *
 * Return value: zbx_session cookie                                           *
 *                                                                            *
 ******************************************************************************/
static char	*report_create_cookie(zbx_rm_t *manager, const char *sessionid)
{
	struct zbx_json	j;
	char		*cookie = NULL, *out_str = NULL, *out;

	zbx_json_init(&j, 512);
	zbx_json_addstring(&j, ZBX_PROTO_TAG_SESSIONID, sessionid, ZBX_JSON_TYPE_STRING);

	if (SUCCEED != zbx_hmac(ZBX_HASH_SHA256, manager->session_key, strlen(manager->session_key), j.buffer,
			j.buffer_size, &out))
	{
		THIS_SHOULD_NEVER_HAPPEN;
		out_str = zbx_strdup(NULL, "");
	}
	else
	{
		out_str = zbx_dsprintf(NULL, "\"%s\"", out);
		zbx_free(out);
	}

	zbx_json_addraw(&j, ZBX_PROTO_TAG_SIGN, out_str);
	zbx_base64_encode_dyn(j.buffer, &cookie, j.buffer_size);

	zbx_json_clean(&j);
	zbx_free(out_str);

	return cookie;
}

/******************************************************************************
 *                                                                            *
 * Purpose: starts new session for specified user                             *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             userid  - [IN]                                                 *
 *                                                                            *
 * Return value: session                                                      *
 *                                                                            *
 * Comments: New session is stored in database.                               *
 *                                                                            *
 ******************************************************************************/
static zbx_rm_session_t	*rm_session_start(zbx_rm_t *manager, zbx_uint64_t userid)
{
	zbx_db_insert_t		db_insert;
	zbx_rm_session_t	*session;

	session = (zbx_rm_session_t *)zbx_malloc(NULL, sizeof(zbx_rm_session_t));
	session->userid = userid;
	session->sid = zbx_create_token(0);
	session->cookie = report_create_cookie(manager, session->sid);

	zbx_db_insert_prepare(&db_insert, "sessions", "sessionid", "userid", "lastaccess", "status", NULL);
	zbx_db_insert_add_values(&db_insert, session->sid, userid, (int)time(NULL), ZBX_SESSION_ACTIVE);
	if (SUCCEED != zbx_db_insert_execute(&db_insert))
	{
		zabbix_log(LOG_LEVEL_WARNING, "Report manager failed to write web session to database, user ID = "
				ZBX_FS_UI64 ".", session->userid);
	}
	zbx_db_insert_clean(&db_insert);

	return session;
}

/******************************************************************************
 *                                                                            *
 * Purpose: flushes report state, lastaccess and error fields                 *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_db_flush_reports(zbx_rm_t *manager)
{
	char	*sql = NULL;
	size_t	sql_alloc = 0, sql_offset = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (0 == manager->flush_queue.values_num)
		goto out;

	zbx_vector_uint64_sort(&manager->flush_queue, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_uint64_uniq(&manager->flush_queue, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zbx_db_begin();

	for (int i = 0; i < manager->flush_queue.values_num; i++)
	{
		zbx_rm_report_t	*report;
		char		delim = ' ';

		if (NULL == (report = (zbx_rm_report_t *)zbx_hashset_search(&manager->reports,
				&manager->flush_queue.values[i])))
		{
			/* report was removed */
			continue;
		}

		zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "update report set");

		if (0 != (report->flags & ZBX_REPORT_UPDATE_LASTSENT))
		{
			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "%clastsent=%d", delim, (int)time(NULL));
			delim = ',';
		}

		if (0 != (report->flags & ZBX_REPORT_UPDATE_STATE))
		{
			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "%cstate=%d", delim, report->state);
			delim = ',';
		}

		if (0 != (report->flags & ZBX_REPORT_UPDATE_ERROR))
		{
			char	*esc, *empty = "";

			if (NULL != report->error)
				esc = zbx_db_dyn_escape_string_len(report->error, REPORT_ERROR_LEN);
			else
				esc = empty;

			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "%cinfo='%s'", delim, esc);

			if (esc != empty)
				zbx_free(esc);
		}

		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " where reportid=" ZBX_FS_UI64 ";\n",
				report->reportid);

		zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset);

		report->flags = 0;
	}

	(void)zbx_db_flush_overflowed_sql(sql, sql_offset);

	zbx_db_commit();

	/* recreate flush queue to release memory */
	zbx_vector_uint64_destroy(&manager->flush_queue);
	zbx_vector_uint64_create(&manager->flush_queue);

	zbx_free(sql);
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: calculates report range from report time and period               *
 *                                                                            *
 * Parameters: report_time - [IN] report writing time                         *
 *             period      - [IN] dashboard period                            *
 *             from        - [OUT] report start time                          *
 *             to          - [OUT] report end time                            *
 *                                                                            *
 * Return value: SUCCEED - report range was calculated successfully           *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	rm_get_report_range(int report_time, unsigned char period, struct tm *from, struct tm *to)
{
	struct tm	*tm;
	time_t		from_time = report_time;
	zbx_time_unit_t	period2unit[] = {ZBX_TIME_UNIT_DAY, ZBX_TIME_UNIT_WEEK, ZBX_TIME_UNIT_MONTH,
						ZBX_TIME_UNIT_YEAR};

	if (ARRSIZE(period2unit) <= period)
		return FAIL;

	tm = zbx_localtime(&from_time, NULL);

	*to = *tm;
	zbx_tm_round_down(to, period2unit[period]);

	*from = *to;
	zbx_tm_sub(from, 1, period2unit[period]);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: makes report attachment name based on report name and timestamp   *
 *                                                                            *
 * Parameters: name        - [IN] report name                                 *
 *             report_time - [IN]                                             *
 *                                                                            *
 * Return value: report attachment name                                       *
 *                                                                            *
 ******************************************************************************/
static char	*rm_get_report_name(const char *name, int report_time)
{
	time_t		rtime = report_time;
	struct tm	*tm;
	char		*name_esc, *name_full;

	name_esc = zbx_strdup(NULL, name);
	for (char *ptr = name_esc; '\0' != *name; ptr++, name++)
	{
		switch (*name)
		{
			case ' ':
			case '\t':
			case ':':
			case '/':
			case '\\':
				*ptr = '_';
				break;
			default:
				*ptr = *name;
				break;
		}
	}

	tm = zbx_localtime(&rtime, NULL);
	name_full = zbx_dsprintf(NULL, "%s_%04d-%02d-%02d_%02d-%02d.pdf", name_esc, tm->tm_year + 1900,
			tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min);

	zbx_free(name_esc);

	return name_full;
}

/******************************************************************************
 *                                                                            *
 * Purpose: creates new job to be processed by report writers                 *
 *                                                                            *
 * Parameters: manager       - [IN]                                           *
 *             report_name   - [IN]                                           *
 *             dashboardid   - [IN] dashboard to view                         *
 *             access_userid - [IN] user accessing the dashboard              *
 *             report_time   - [IN]                                           *
 *             period        - [IN] report period                             *
 *             userids       - [IN] recipient user identifiers                *
 *             userids_num   - [IN] number of recipients                      *
 *             params        - [IN] viewing and processing parameters         *
 *             error         - [OUT]                                          *
 *                                                                            *
 ******************************************************************************/
static zbx_rm_job_t	*rm_create_job(zbx_rm_t *manager, const char *report_name, zbx_uint64_t dashboardid,
		zbx_uint64_t access_userid, int report_time, unsigned char period, zbx_uint64_t *userids,
		int userids_num, const zbx_vector_ptr_pair_t *params, char **error)
{
	size_t		url_alloc = 0, url_offset = 0;
	struct tm	from, to;

	if ('\0' == *manager->zabbix_url)
	{
		*error = zbx_strdup(NULL, "The Frontend URL has not been configured");
		return NULL;
	}

	if (SUCCEED != rm_get_report_range(report_time, period, &from, &to))
	{
		*error = zbx_strdup(NULL, "invalid report time or period");
		return NULL;
	}

	zbx_rm_job_t	*job = (zbx_rm_job_t *)zbx_malloc(NULL, sizeof(zbx_rm_job_t));
	memset(job, 0, sizeof(zbx_rm_job_t));

	job->report_name = rm_get_report_name(report_name, report_time);

	zbx_vector_ptr_pair_create(&job->params);
	for (int i = 0; i < params->values_num; i++)
	{
		zbx_ptr_pair_t	pair;

		pair.first = zbx_strdup(NULL, (const char *)params->values[i].first);
		pair.second = zbx_strdup(NULL, (const char *)params->values[i].second);
		zbx_vector_ptr_pair_append(&job->params, pair);
	}

	zbx_vector_uint64_create(&job->userids);
	zbx_vector_uint64_append_array(&job->userids, userids, userids_num);

	zbx_snprintf_alloc(&job->url, &url_alloc, &url_offset,
			"%s/zabbix.php?action=dashboard.print&dashboardid=" ZBX_FS_UI64,
			manager->zabbix_url, dashboardid);
	zbx_snprintf_alloc(&job->url, &url_alloc, &url_offset, "&from=%s", rm_time_to_urlfield(&from));
	zbx_snprintf_alloc(&job->url, &url_alloc, &url_offset, "&to=%s", rm_time_to_urlfield(&to));

	job->session = rm_session_start(manager, access_userid);

	job->access_userid = access_userid;

	return job;
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates report state, lastsent and error in cache                 *
 *                                                                            *
 * Parameters: manager - [IN] report manager                                  *
 *             report  - [IN] report to process                               *
 *             state   - [IN] new report status                               *
 *             info    - [IN] new report error message                        *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_report(zbx_rm_t *manager, zbx_rm_report_t *report, int state, const char *info)
{
	zbx_uint32_t	flags = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() reportid:" ZBX_FS_UI64 ", state:%d info:%s", __func__,
			report->reportid, state, ZBX_NULL2EMPTY_STR(info));

	if (ZBX_REPORT_STATE_ERROR != state)
		flags |= ZBX_REPORT_UPDATE_LASTSENT;

	if (report->state != state)
	{
		report->state = state;
		flags |= ZBX_REPORT_UPDATE_STATE;
	}

	if (NULL == info)
	{
		if (NULL != report->error && '\0' != *report->error)
		{
			flags |= ZBX_REPORT_UPDATE_ERROR;
			zbx_free(report->error);
		}
	}
	else if (NULL == report->error || 0 != strcmp(report->error, info))
	{
		report->error = zbx_strdup(report->error, info);
		flags |= ZBX_REPORT_UPDATE_ERROR;
	}

	if (0 != flags)
	{
		report->flags |= flags;
		zbx_vector_uint64_append(&manager->flush_queue, report->reportid);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: calculates time when report must be generated                     *
 *                                                                            *
 * Parameters: report - [IN]                                                  *
 *             now    - [IN] current time                                     *
 *             error  - [OUT]                                                 *
 *                                                                            *
 ******************************************************************************/
static int	rm_report_calc_nextcheck(const zbx_rm_report_t *report, int now, char **error)
{
	int	nextcheck;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() now:%s %s", __func__, zbx_date2str(now, NULL), zbx_time2str(now, NULL));

	if (ZBX_REPORT_CYCLE_WEEKLY == report->cycle && 0 == report->weekdays)
	{
		*error = zbx_strdup(NULL, "Cannot calculate report start time: weekdays must be set for weekly cycle");
		nextcheck = -1;
	}
	else
	{
		if (-1 == (nextcheck = zbx_get_report_nextcheck(now, report->cycle, report->weekdays,
				report->start_time)))
		{
			*error = zbx_dsprintf(NULL, "Cannot calculate report start time: %s",
					zbx_strerror(errno));
		}

	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() nextcheck:%s %s, error:%s", __func__, zbx_date2str(nextcheck, NULL),
			zbx_time2str(nextcheck, NULL), ZBX_NULL2EMPTY_STR(*error));

	return nextcheck;
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates report parameters                                         *
 *                                                                            *
 * Parameters: report - [IN]                                                  *
 *             params - [IN] report parameters                                *
 *                                                                            *
 ******************************************************************************/
static void	rm_report_update_params(zbx_rm_report_t *report, zbx_vector_ptr_pair_t *params)
{
	zbx_vector_ptr_pair_t	old_params;

	zbx_vector_ptr_pair_create(&old_params);

	zbx_vector_ptr_pair_append_array(&old_params, report->params.values, report->params.values_num);
	zbx_vector_ptr_pair_clear(&report->params);

	for (int i = 0; i < params->values_num; i++)
	{
		zbx_ptr_pair_t	pair = {0};
		zbx_ptr_pair_t	*new_param = &params->values[i];

		for (int j = 0; j < old_params.values_num; j++)
		{
			zbx_ptr_pair_t	*old_param = &old_params.values[j];

			if (0 == strcmp(new_param->first, old_param->first))
			{
				pair.first = old_param->first;
				old_param->first = NULL;

				if (0 == strcmp(new_param->second, old_param->second))
				{
					pair.second = old_param->second;
					old_param->second = NULL;
				}
				else
					pair.second = zbx_strdup(old_param->second, new_param->second);

				zbx_vector_ptr_pair_remove_noorder(&old_params, j);
				break;
			}
		}

		if (NULL == pair.first)
		{
			pair.first = zbx_strdup(NULL, new_param->first);
			pair.second = zbx_strdup(NULL, new_param->second);
		}

		zbx_vector_ptr_pair_append(&report->params, pair);
	}

	report_destroy_params(&old_params);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates report recipient users                                    *
 *                                                                            *
 * Parameters: report     - [IN]                                              *
 *             users      - [IN] recipient users                              *
 *             users_excl - [IN] excluded user ids                            *
 *                                                                            *
 ******************************************************************************/
static void	rm_report_update_users(zbx_rm_report_t *report, const zbx_vector_recipient_t *users,
		const zbx_vector_uint64_t *users_excl)
{
	zbx_vector_recipient_clear(&report->users);
	zbx_vector_recipient_append_array(&report->users, users->values, users->values_num);

	zbx_vector_uint64_clear(&report->users_excl);
	zbx_vector_uint64_append_array(&report->users_excl, users_excl->values, users_excl->values_num);
	zbx_vector_uint64_sort(&report->users_excl, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates report recipient user groups                              *
 *                                                                            *
 * Parameters: report     - [IN]                                              *
 *             usergroups - [IN] recipient user groups                        *
 *                                                                            *
 ******************************************************************************/
static void	rm_report_update_usergroups(zbx_rm_report_t *report, const zbx_vector_recipient_t *usergroups)
{
	zbx_vector_recipient_clear(&report->usergroups);
	zbx_vector_recipient_append_array(&report->usergroups, usergroups->values, usergroups->values_num);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates general settings cache                                    *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_cache_settings(zbx_rm_t *manager)
{
	zbx_db_row_t	row;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_db_result_t	result = zbx_db_select("select name, value_str from settings"
			" where name='session_key' or name='url'");

	manager->session_key = zbx_strdup(manager->session_key, "");
	manager->zabbix_url = zbx_strdup(manager->zabbix_url, "");

	while (NULL != (row = zbx_db_fetch(result)))
	{
		const char	*name = row[0];
		const char	*value = row[1];

		if (0 == strcmp(name, "session_key"))
		{
			manager->session_key = zbx_strdup(manager->session_key, value);
		}
		else if (0 == strcmp(name, "url"))
		{
			manager->zabbix_url = zbx_strdup(manager->zabbix_url, value);
		}
	}

	zbx_db_free_result(result);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: checks if report is active based on specified time                *
 *                                                                            *
 * Parameters: report - [IN]                                                  *
 *             when   - [IN] next check time                                  *
 *                                                                            *
 * Return value: SUCCEED - report is active                                   *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	rm_is_report_active(const zbx_rm_report_t *report, int when)
{
	if (0 != report->active_since && when < report->active_since)
		return FAIL;

	if (0 != report->active_till && when >= report->active_till)
		return FAIL;

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: removes report from queue if it was queued                        *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             report  - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_dequeue_report(zbx_rm_t *manager, zbx_rm_report_t *report)
{
	if (0 != report->nextcheck)
	{
		zbx_binary_heap_remove_direct(&manager->report_queue, report->reportid);
		report->nextcheck = 0;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates reports cache                                             *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             now     - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_cache_reports(zbx_rm_t *manager, int now)
{
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	zbx_vector_uint64_t	reportids;
	zbx_hashset_iter_t	iter;
	zbx_rm_report_t		*report, report_local;
	zbx_config_t		cfg;
	const char		*tz;
	char			*error = NULL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DEFAULT_TIMEZONE);

	zbx_vector_uint64_create(&reportids);

	result = zbx_db_select("select r.reportid,r.userid,r.name,r.dashboardid,r.period,r.cycle,r.weekdays,r.start_time,"
				"r.active_since,r.active_till,u.timezone,r.state,r.info,r.lastsent,r.status"
			" from report r,users u"
			" where r.userid=u.userid");

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_uint64_t	reportid;
		int		nextcheck, start_time, active_since, active_till, reschedule = 0;
		unsigned char	period, cycle, weekdays;

		ZBX_STR2UINT64(reportid, row[0]);
		zbx_vector_uint64_append(&reportids, reportid);

		tz = row[10];
		if (0 == strcmp(tz, ZBX_TIMEZONE_DEFAULT_VALUE))
			tz = cfg.default_timezone;

		ZBX_STR2UCHAR(period, row[4]);
		ZBX_STR2UCHAR(cycle, row[5]);
		ZBX_STR2UCHAR(weekdays, row[6]);
		start_time = atoi(row[7]);
		active_since = atoi(row[8]);
		active_till = atoi(row[9]);

		if (NULL == (report = (zbx_rm_report_t *)zbx_hashset_search(&manager->reports, &reportid)))
		{
			report_local.reportid = reportid;
			ZBX_STR2UINT64(report_local.userid, row[1]);
			ZBX_STR2UINT64(report_local.dashboardid, row[3]);
			report_local.name = zbx_strdup(NULL, row[2]);
			report_local.timezone = zbx_strdup(NULL, tz);
			report_local.error = zbx_strdup(NULL, row[12]);
			ZBX_STR2UCHAR(report_local.period, row[4]);
			ZBX_STR2UCHAR(report_local.cycle, row[5]);
			ZBX_STR2UCHAR(report_local.weekdays, row[6]);
			ZBX_STR2UCHAR(report_local.status, row[14]);
			report_local.start_time = atoi(row[7]);
			ZBX_STR2UCHAR(report_local.state, row[11]);
			report_local.flags = 0;
			report_local.nextcheck = 0;
			report_local.active_since = atoi(row[8]);
			report_local.active_till = atoi(row[9]);
			report_local.lastsent = atoi(row[13]);
			zbx_vector_ptr_pair_create(&report_local.params);
			zbx_vector_recipient_create(&report_local.usergroups);
			zbx_vector_recipient_create(&report_local.users);
			zbx_vector_uint64_create(&report_local.users_excl);

			report = (zbx_rm_report_t *)zbx_hashset_insert(&manager->reports, &report_local,
					sizeof(report_local));

			reschedule = 1;
		}
		else
		{
			if (report->period != period || report->cycle != cycle || report->weekdays != weekdays ||
					report->start_time != start_time || report->active_since != active_since ||
					report->active_till != active_till)
			{
				reschedule = 1;
			}

			if (0 != strcmp(report->name, row[2]))
				report->name = zbx_strdup(report->name, row[2]);

			if (0 != strcmp(report->timezone, tz))
			{
				report->timezone = zbx_strdup(report->timezone, tz);
				reschedule = 1;
			}

			ZBX_STR2UINT64(report->userid, row[1]);
			ZBX_STR2UINT64(report->dashboardid, row[3]);
			ZBX_STR2UCHAR(report->period, row[4]);
			ZBX_STR2UCHAR(report->cycle, row[5]);
			ZBX_STR2UCHAR(report->weekdays, row[6]);
			report->start_time = atoi(row[7]);
			report->active_since = atoi(row[8]);
			report->active_till = atoi(row[9]);
			ZBX_STR2UCHAR(report->status, row[14]);
		}

		if (ZBX_REPORT_STATUS_DISABLED == report->status)
		{
			rm_dequeue_report(manager, report);
			continue;
		}

		if (0 == reschedule)
			continue;

		if (-1 != (nextcheck = rm_report_calc_nextcheck(report, now, &error)))
		{
			if (nextcheck != report->nextcheck)
			{
				if (SUCCEED == rm_is_report_active(report, nextcheck))
				{
					zbx_binary_heap_elem_t	elem = {report->reportid, (void *)report};
					int			nextcheck_old = report->nextcheck;

					report->nextcheck = nextcheck;

					if (0 != nextcheck_old)
						zbx_binary_heap_update_direct(&manager->report_queue, &elem);
					else
						zbx_binary_heap_insert(&manager->report_queue, &elem);
				}
				else
					rm_dequeue_report(manager, report);
			}
		}
		else
		{
			rm_update_report(manager, report, ZBX_REPORT_STATE_ERROR, error);
			rm_dequeue_report(manager, report);
			zbx_free(error);
		}
	}
	zbx_db_free_result(result);

	/* remove deleted reports from cache */
	zbx_vector_uint64_sort(&reportids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_hashset_iter_reset(&manager->reports, &iter);
	while (NULL != (report = (zbx_rm_report_t *)zbx_hashset_iter_next(&iter)))
	{
		if (FAIL == zbx_vector_uint64_bsearch(&reportids, report->reportid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
		{
			rm_dequeue_report(manager, report);
			rm_report_clean(report);
			zbx_hashset_iter_remove(&iter);
		}
	}

	zbx_vector_uint64_destroy(&reportids);

	zbx_config_clean(&cfg);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates cached report parameters                                  *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_cache_reports_params(zbx_rm_t *manager)
{
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	zbx_rm_report_t		*report = NULL;
	zbx_vector_ptr_pair_t	params;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_ptr_pair_create(&params);

	result = zbx_db_select("select rp.reportid,rp.name,rp.value"
			" from report_param rp,report r"
			" where rp.reportid=r.reportid"
				" and r.status=%d"
			" order by r.reportid",
				ZBX_REPORT_STATUS_ENABLED);

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_uint64_t	reportid;
		zbx_ptr_pair_t	pair;

		ZBX_STR2UINT64(reportid, row[0]);
		if (NULL != report)
		{
			if (reportid != report->reportid)
			{
				rm_report_update_params(report, &params);
				report_clear_params(&params);
			}
			report = NULL;
		}

		if (NULL == report)
		{
			if (NULL == (report = (zbx_rm_report_t *)zbx_hashset_search(&manager->reports, &reportid)))
				continue;
		}

		pair.first = zbx_strdup(NULL, row[1]);
		pair.second = zbx_strdup(NULL, row[2]);
		zbx_vector_ptr_pair_append(&params, pair);
	}
	zbx_db_free_result(result);

	if (NULL != report && 0 != params.values_num)
		rm_report_update_params(report, &params);

	report_destroy_params(&params);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates cached report recipient users                             *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_cache_reports_users(zbx_rm_t *manager)
{
#define ZBX_REPORT_INCLUDE_USER		0
#define ZBX_REPORT_EXCLUDE_USER		1
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	zbx_rm_report_t		*report = NULL;
	zbx_vector_recipient_t	users;
	zbx_vector_uint64_t	users_excl;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_recipient_create(&users);
	zbx_vector_uint64_create(&users_excl);

	result = zbx_db_select("select ru.reportid,ru.userid,ru.exclude,ru.access_userid"
			" from report_user ru,report r"
			" where ru.reportid=r.reportid"
				" and r.status=%d"
			" order by r.reportid",
				ZBX_REPORT_STATUS_ENABLED);

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_uint64_t	reportid, userid;

		ZBX_STR2UINT64(reportid, row[0]);
		if (NULL != report)
		{
			if (reportid != report->reportid)
			{
				rm_report_update_users(report, &users, &users_excl);
				zbx_vector_recipient_clear(&users);
				zbx_vector_uint64_clear(&users_excl);
			}
			report = NULL;
		}

		if (NULL == report)
		{
			if (NULL == (report = (zbx_rm_report_t *)zbx_hashset_search(&manager->reports, &reportid)))
				continue;
		}

		ZBX_STR2UINT64(userid, row[1]);
		if (ZBX_REPORT_INCLUDE_USER == atoi(row[2]))
		{
			zbx_rm_recipient_t	user;

			user.id = userid;
			ZBX_DBROW2UINT64(user.access_userid, row[3]);
			zbx_vector_recipient_append(&users, user);
		}
		else
			zbx_vector_uint64_append(&users_excl, userid);
	}
	zbx_db_free_result(result);

	if (NULL != report && (0 != users.values_num || 0 != users_excl.values_num))
		rm_report_update_users(report, &users, &users_excl);

	zbx_vector_uint64_destroy(&users_excl);
	zbx_vector_recipient_destroy(&users);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
#undef ZBX_REPORT_INCLUDE_USER
#undef ZBX_REPORT_EXCLUDE_USER
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates cached report recipient user groups                       *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_cache_reports_usergroups(zbx_rm_t *manager)
{
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	zbx_rm_report_t		*report = NULL;
	zbx_vector_recipient_t	usergroups;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_recipient_create(&usergroups);

	result = zbx_db_select("select rg.reportid,rg.usrgrpid,rg.access_userid"
			" from report_usrgrp rg,report r"
			" where rg.reportid=r.reportid"
				" and r.status=%d"
			" order by r.reportid",
				ZBX_REPORT_STATUS_ENABLED);

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_uint64_t		reportid;
		zbx_rm_recipient_t	usergroup;

		ZBX_STR2UINT64(reportid, row[0]);
		if (NULL != report)
		{
			if (reportid != report->reportid)
			{
				rm_report_update_usergroups(report, &usergroups);
				zbx_vector_recipient_clear(&usergroups);
			}
			report = NULL;
		}

		if (NULL == report)
		{
			if (NULL == (report = (zbx_rm_report_t *)zbx_hashset_search(&manager->reports, &reportid)))
				continue;
		}

		ZBX_STR2UINT64(usergroup.id, row[1]);
		ZBX_DBROW2UINT64(usergroup.access_userid, row[2]);
		zbx_vector_recipient_append(&usergroups, usergroup);
	}
	zbx_db_free_result(result);

	if (NULL != report && 0 != usergroups.values_num)
		rm_report_update_usergroups(report, &usergroups);

	zbx_vector_recipient_destroy(&usergroups);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

#undef ZBX_REPORT_STATUS_ENABLED
#undef ZBX_REPORT_STATUS_DISABLED

/******************************************************************************
 *                                                                            *
 * Purpose: dumps cached reports into log                                     *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_dump_cache(zbx_rm_t *manager)
{
	zbx_hashset_iter_t	iter;
	zbx_rm_report_t		*report;
	char			*str = NULL;
	size_t			str_alloc = 0, str_offset;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_hashset_iter_reset(&manager->reports, &iter);
	while (NULL != (report = (zbx_rm_report_t *)zbx_hashset_iter_next(&iter)))
	{
		str_offset = 0;

		zabbix_log(LOG_LEVEL_TRACE, "reportid:" ZBX_FS_UI64 ", name:%s, userid:" ZBX_FS_UI64 ", dashboardid:"
				ZBX_FS_UI64 ", period:%d, cycle:%d, weekdays:0x%x",
				report->reportid, report->name, report->userid, report->dashboardid, report->period,
				report->cycle, report->weekdays);

		zbx_strcpy_alloc(&str, &str_alloc, &str_offset, "active:");
		if (0 != report->active_since)
		{
			zbx_snprintf_alloc(&str, &str_alloc, &str_offset, "%s %s",
					zbx_date2str(report->active_since, NULL),
					zbx_time2str(report->active_since, NULL));
		}

		zbx_strcpy_alloc(&str, &str_alloc, &str_offset, " - ");
		if (0 != report->active_till)
		{
			zbx_snprintf_alloc(&str, &str_alloc, &str_offset, "%s %s",
					zbx_date2str(report->active_till, NULL),
					zbx_time2str(report->active_till, NULL));
		}

		zbx_snprintf_alloc(&str, &str_alloc, &str_offset, ", start_time:%d:%02d:%02d, timezone:%s",
				report->start_time / SEC_PER_HOUR, report->start_time % SEC_PER_HOUR / SEC_PER_MIN,
				report->start_time % SEC_PER_MIN, report->timezone);
		zbx_snprintf_alloc(&str, &str_alloc, &str_offset, ", nextcheck:%s %s",
				zbx_date2str(report->nextcheck, NULL), zbx_time2str(report->nextcheck, NULL));
		zabbix_log(LOG_LEVEL_TRACE, "  %s", str);

		zabbix_log(LOG_LEVEL_TRACE, "  params:");
		for (int i = 0; i < report->params.values_num; i++)
		{
			zabbix_log(LOG_LEVEL_TRACE, "    %s:%s", (char *)report->params.values[i].first,
					(char *)report->params.values[i].second);
		}

		zabbix_log(LOG_LEVEL_TRACE, "  users:");
		for (int i = 0; i < report->users.values_num; i++)
		{
			zbx_rm_recipient_t	*user = (zbx_rm_recipient_t *)&report->users.values[i];

			zabbix_log(LOG_LEVEL_TRACE, "    userid:" ZBX_FS_UI64 ", acess_userid:" ZBX_FS_UI64,
					user->id, user->access_userid);
		}

		zabbix_log(LOG_LEVEL_TRACE, "  usergroups:");
		for (int i = 0; i < report->usergroups.values_num; i++)
		{
			zbx_rm_recipient_t	*usergroup = (zbx_rm_recipient_t *)&report->usergroups.values[i];

			zabbix_log(LOG_LEVEL_TRACE, "    usrgrpid:" ZBX_FS_UI64 ", acess_userid:" ZBX_FS_UI64,
					usergroup->id, usergroup->access_userid);
		}

		zabbix_log(LOG_LEVEL_TRACE, "  exclude:");
		for (int i = 0; i < report->users_excl.values_num; i++)
		{
			zabbix_log(LOG_LEVEL_TRACE, "    userid:" ZBX_FS_UI64, report->users_excl.values[i]);
		}
	}

	zbx_free(str);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates configuration and report cache                            *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 ******************************************************************************/
static void	rm_update_cache(zbx_rm_t *manager)
{
	int	now = (int)time(NULL);

	rm_update_cache_settings(manager);
	rm_update_cache_reports(manager, now);
	rm_update_cache_reports_params(manager);
	rm_update_cache_reports_users(manager);
	rm_update_cache_reports_usergroups(manager);

	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
		rm_dump_cache(manager);
}

typedef struct
{
	zbx_uint64_t	mediatypeid;
	char		*recipient;
}
zbx_report_dst_t;

static void	zbx_report_dst_free(void *ptr)
{
	zbx_report_dst_t	*dst= (zbx_report_dst_t *)ptr;

	zbx_free(dst->recipient);
	zbx_free(dst);
}

/******************************************************************************
 *                                                                            *
 * Purpose: processes job by sending it to writer                             *
 *                                                                            *
 * Parameters: writer - [IN]                                                  *
 *             job    - [IN] view to process                                  *
 *             error  - [OUT] error message                                   *
 *                                                                            *
 ******************************************************************************/
static int	rm_writer_process_job(zbx_rm_writer_t *writer, zbx_rm_job_t *job, char **error)
{
	unsigned char		*data = NULL;
	zbx_uint32_t		size;
	int			rc, ret = FAIL;
	char			*sql = NULL;
	size_t			sql_alloc = 0, sql_offset = 0;
	zbx_vector_uint64_t	mediatypeids;
	zbx_vector_ptr_t	dsts;
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	zbx_report_dst_t	*dst;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() url:%s", __func__, job->url);

	zbx_vector_uint64_create(&mediatypeids);
	zbx_vector_ptr_create(&dsts);

	zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset,
			"select m.sendto,mt.mediatypeid"
			" from media m,media_type mt"
			" where");
	zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "m.userid", job->userids.values,
			job->userids.values_num);
	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
				" and m.active=%d"
				" and m.mediatypeid=mt.mediatypeid"
				" and mt.type=%d"
				" and mt.status=%d",
			MEDIA_STATUS_ACTIVE, MEDIA_TYPE_EMAIL, MEDIA_TYPE_STATUS_ACTIVE);

	result = zbx_db_select("%s", sql);

	while (NULL != (row = zbx_db_fetch(result)))
	{
		dst = (zbx_report_dst_t *)zbx_malloc(NULL, sizeof(zbx_report_dst_t));
		ZBX_STR2UINT64(dst->mediatypeid, row[1]);
		dst->recipient = zbx_strdup(NULL, row[0]);
		zbx_vector_ptr_append(&dsts, dst);
		zbx_vector_uint64_append(&mediatypeids, dst->mediatypeid);
	}
	zbx_db_free_result(result);

	if (0 == dsts.values_num)
	{
		char	*username = NULL;

		if (1 != job->is_test_report)
		{
			*error = zbx_dsprintf(NULL, "No media configured for the report recipients");
			goto out;
		}

		sql = zbx_dsprintf(sql, "select username from users where userid=" ZBX_FS_UI64, job->userids.values[0]);

		result = zbx_db_select("%s", sql);

		/* username of the user who tests a scheduled report should always be present */
		if (NULL != (row = zbx_db_fetch(result)))
			username = row[0];

		*error = zbx_dsprintf(NULL, "No media configured for the report recipient '%s'",
				ZBX_NULL2STR(username));
		zbx_db_free_result(result);
		goto out;
	}

	size = report_serialize_begin_report(&data, job->report_name, job->url, job->session->cookie, &job->params);

	if (SUCCEED != zbx_ipc_client_send(writer->client, ZBX_IPC_REPORTER_BEGIN_REPORT, data, size))
	{
		THIS_SHOULD_NEVER_HAPPEN;
		*error = zbx_dsprintf(NULL, "Cannot send message to report writer");
		goto out;
	}

	zbx_free(data);

	ret = SUCCEED;

	if (0 != dsts.values_num)
	{
		zbx_vector_str_t	recipients;
		int			index = 0;

		zbx_vector_str_create(&recipients);

		zbx_vector_ptr_sort(&dsts, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
		zbx_vector_uint64_sort(&mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
		zbx_vector_uint64_uniq(&mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

		sql_offset = 0;

		zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset,
				"select mediatypeid,type,smtp_server,smtp_helo,smtp_email,exec_path,gsm_modem,username,"
					"passwd,smtp_port,smtp_security,smtp_verify_peer,smtp_verify_host,"
					"smtp_authentication,maxsessions,maxattempts,attempt_interval,"
					"message_format,script,timeout,name"
				" from media_type"
				" where");

		zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "mediatypeid", mediatypeids.values,
				mediatypeids.values_num);

		result = zbx_db_select("%s", sql);

		while (NULL != (row = zbx_db_fetch(result)) && SUCCEED == ret)
		{
			zbx_db_mediatype	mt;

			ZBX_STR2UINT64(mt.mediatypeid, row[0]);

			mt.type = atoi(row[1]);
			mt.smtp_server = zbx_strdup(NULL, row[2]);
			mt.smtp_helo = zbx_strdup(NULL, row[3]);
			mt.smtp_email = zbx_strdup(NULL, row[4]);
			mt.exec_path = zbx_strdup(NULL, row[5]);
			mt.gsm_modem = zbx_strdup(NULL, row[6]);
			mt.username = zbx_strdup(NULL, row[7]);
			mt.passwd = zbx_strdup(NULL, row[8]);
			mt.smtp_port = (unsigned short)atoi(row[9]);
			ZBX_STR2UCHAR(mt.smtp_security, row[10]);
			ZBX_STR2UCHAR(mt.smtp_verify_peer, row[11]);
			ZBX_STR2UCHAR(mt.smtp_verify_host, row[12]);
			ZBX_STR2UCHAR(mt.smtp_authentication, row[13]);
			mt.maxsessions = atoi(row[14]);
			mt.maxattempts = atoi(row[15]);
			mt.attempt_interval = zbx_strdup(NULL, row[16]);
			ZBX_STR2UCHAR(mt.message_format, row[17]);
			mt.script = zbx_strdup(NULL, row[18]);
			mt.timeout = zbx_strdup(NULL, row[19]);
			mt.name = zbx_strdup(NULL, row[20]);

			for (; index < dsts.values_num; index++)
			{
				dst = (zbx_report_dst_t *)dsts.values[index];
				if (dst->mediatypeid != mt.mediatypeid)
					break;
				zbx_vector_str_append(&recipients, dst->recipient);
			}

			if (0 != recipients.values_num)
			{
				size = report_serialize_send_report(&data, &mt, &recipients);
				ret = zbx_ipc_client_send(writer->client, ZBX_IPC_REPORTER_SEND_REPORT, data, size);
				zbx_free(data);
			}
			else
				THIS_SHOULD_NEVER_HAPPEN;

			zbx_vector_str_clear(&recipients);
			zbx_db_mediatype_clean(&mt);
		}
		zbx_db_free_result(result);

		zbx_vector_str_destroy(&recipients);
	}

	/* attempt to send finish request even if last sending failed */
	rc = zbx_ipc_client_send(writer->client, ZBX_IPC_REPORTER_END_REPORT, NULL, 0);
	if (SUCCEED == ret)
		ret = rc;
out:
	zbx_free(sql);
	zbx_free(data);
	zbx_vector_ptr_clear_ext(&dsts, zbx_report_dst_free);
	zbx_vector_ptr_destroy(&dsts);
	zbx_vector_uint64_destroy(&mediatypeids);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: creates jobs to process report                                    *
 *                                                                            *
 * Parameters: manager       - [IN]                                           *
 *             report        - [IN] report to process                         *
 *             userid        - [IN] recipient user id                         *
 *             access_userid - [IN] user id used to create the report         *
 *             now           - [IN]                                           *
 *             params        - [IN] report parameters                         *
 *             jobs          - [IN/OUT] created jobs                          *
 *             error         - [OUT] error message                            *
 *                                                                            *
 * Return value: SUCCEED - user was added to existing job or new one was      *
 *                         successfully created.                              *
 *               FAIL    - failed to create a new job for the user.           *
 *                                                                            *
 ******************************************************************************/
static int	rm_jobs_add_user(zbx_rm_t *manager, zbx_rm_report_t *report, zbx_uint64_t userid,
		zbx_uint64_t access_userid, int now, const zbx_vector_ptr_pair_t *params, zbx_vector_ptr_t *jobs,
		char **error)
{
	int		i;
	zbx_rm_job_t	*job = NULL;

	if (FAIL != zbx_vector_uint64_search(&report->users_excl, userid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
		return SUCCEED;

	for (i = 0; i < jobs->values_num; i++)
	{
		job = (zbx_rm_job_t *)jobs->values[i];
		if (job->access_userid == access_userid)
			break;
	}

	if (i == jobs->values_num)
	{
		if (NULL == (job = rm_create_job(manager, report->name, report->dashboardid, access_userid, now,
				report->period, &userid, 1, params, error)))
		{
			return FAIL;
		}
		zbx_vector_ptr_append(jobs, job);
	}

	zbx_vector_uint64_append(&job->userids, userid);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: creates user group based jobs                                     *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             report  - [IN] report to process                               *
 *             now     - [IN]                                                 *
 *             params  - [IN] report parameters                               *
 *             jobs    - [IN/OUT] created jobs                                *
 *             error   - [OUT] error message                                  *
 *                                                                            *
 * Return value: SUCCEED - jobs were created successfully                     *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	rm_report_create_usergroup_jobs(zbx_rm_t *manager, zbx_rm_report_t *report, int now,
		const zbx_vector_ptr_pair_t *params, zbx_vector_ptr_t *jobs, char **error)
{
	zbx_db_row_t		row;
	zbx_db_result_t		result;
	zbx_vector_uint64_t	ids;
	int			ret = FAIL;
	char			*sql = NULL;
	size_t			sql_alloc = 0, sql_offset = 0;
	zbx_uint64_t		userid, usrgrpid, access_userid;

	zbx_vector_uint64_create(&ids);

	for (int i = 0; i < report->usergroups.values_num; i++)
		zbx_vector_uint64_append(&ids, report->usergroups.values[i].id);

	zbx_vector_uint64_sort(&ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_uint64_uniq(&ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "select userid,usrgrpid from users_groups where");
	zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "usrgrpid", ids.values, ids.values_num);

	result = zbx_db_select("%s", sql);
	while (NULL != (row = zbx_db_fetch(result)))
	{
		access_userid = 0;

		ZBX_STR2UINT64(userid, row[0]);
		ZBX_STR2UINT64(usrgrpid, row[1]);

		for (int i = 0; i < report->usergroups.values_num; i++)
		{
			if (report->usergroups.values[i].id == usrgrpid)
			{
				access_userid = report->usergroups.values[i].access_userid;
				break;
			}
		}

		if (0 == access_userid)
			access_userid = userid;

		if (SUCCEED != rm_jobs_add_user(manager, report, userid, access_userid, now, params, jobs, error))
		{
			goto out;
		}
	}

	ret = SUCCEED;
out:
	zbx_db_free_result(result);

	zbx_free(sql);
	zbx_vector_uint64_destroy(&ids);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: resolves macros in report context                                 *
 *                                                                            *
 * Parameters: p            - [IN] macro resolver data structure              *
 *             args         - [IN] list of variadic parameters                *
 *                                 Expected content:                          *
 *                                  - const char *tz: name of timezone        *
 *                                      (can be NULL)                         *
 *             replace_with - [OUT] pointer to value to replace macro with    *
 *             data         - [IN/OUT] pointer to original input raw string   *
 *                                  (for macro in macro resolving)            *
 *             error        - [OUT] pointer to pre-allocated error message    *
 *                                  buffer (can be NULL)                      *
 *             maxerrlen    - [IN] size of error message buffer (can be 0 if  *
 *                                 'error' is NULL)                           *
 *                                                                            *
 ******************************************************************************/
static int	macro_report_resolv(zbx_macro_resolv_data_t *p, va_list args, char **replace_with, char **data,
		char *error, size_t maxerrlen)
{
	/* Passed arguments */
	const char	*tz = va_arg(args, const char *);

	ZBX_UNUSED(data);
	ZBX_UNUSED(error);
	ZBX_UNUSED(maxerrlen);

	if (0 == strcmp(p->macro, MVAR_TIME))
	{
		*replace_with = zbx_strdup(*replace_with, zbx_time2str(time(NULL), tz));
	}
	else if (0 == strcmp(p->macro, MVAR_TIMESTAMP))
	{
		*replace_with = zbx_dsprintf(*replace_with, "%ld", (long)time(NULL));
	}

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: creates jobs to process report                                    *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             report  - [IN] report to process                               *
 *             now     - [IN]                                                 *
 *             error   - [OUT] error message                                  *
 *                                                                            *
 * Return value: SUCCEED - jobs were created successfully                     *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	rm_report_create_jobs(zbx_rm_t *manager, zbx_rm_report_t *report, int now, char **error)
{
	zbx_vector_ptr_t	jobs;
	int			jobs_num, ret = FAIL;
	zbx_uint64_t		access_userid;
	zbx_rm_batch_t		*batch, batch_local;
	zbx_vector_ptr_pair_t	params;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() reportid:" ZBX_FS_UI64 , __func__, report->reportid);

	zbx_vector_ptr_create(&jobs);
	zbx_vector_ptr_pair_create(&params);

	for (int i = 0; i < report->params.values_num; i++)
	{
		zbx_ptr_pair_t	pair;

		pair.first = zbx_strdup(NULL, report->params.values[i].first);
		pair.second = zbx_strdup(NULL, report->params.values[i].second);

		if (0 == strcmp(pair.first, ZBX_REPORT_PARAM_BODY) || 0 == strcmp(pair.first, ZBX_REPORT_PARAM_SUBJECT))
		{
			zbx_substitute_macros((char **)&pair.second, NULL, 0, macro_report_resolv, NULL);
		}

		zbx_vector_ptr_pair_append(&params, pair);
	}

	zbx_db_begin();

	for (int i = 0; i < report->users.values_num; i++)
	{
		if (0 == (access_userid = report->users.values[i].access_userid))
			access_userid = report->users.values[i].id;

		if (SUCCEED != rm_jobs_add_user(manager, report, report->users.values[i].id, access_userid, now,
				&params, &jobs, error))
		{
			goto out;
		}
	}

	if (0 != report->usergroups.values_num)
	{
		if (SUCCEED != rm_report_create_usergroup_jobs(manager, report, now, &params, &jobs, error))
		{
			goto out;
		}
	}

	/* create job batch for result tracking */
	batch_local.batchid = ++manager->last_batchid;
	batch = (zbx_rm_batch_t *)zbx_hashset_insert(&manager->batches, &batch_local, sizeof(batch_local));
	batch->reportid = report->reportid;
	batch->error_num = 0;
	batch->sent_num = 0;
	batch->total_num = 0;
	batch->info = NULL;
	batch->info_alloc = 0;
	batch->info_offset = 0;
	zbx_vector_ptr_create(&batch->jobs);
	zbx_vector_ptr_append_array(&batch->jobs, jobs.values, jobs.values_num);

	/* queue jobs */
	for (int i = 0; i < jobs.values_num; i++)
	{
		zbx_rm_job_t	*job = (zbx_rm_job_t *)jobs.values[i];

		zbx_vector_uint64_sort(&job->userids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
		zbx_vector_uint64_uniq(&job->userids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
		(void)zbx_list_append(&manager->job_queue, job, NULL);
		job->batchid = batch->batchid;
	}

	ret = SUCCEED;
out:
	if (SUCCEED == ret)
	{
		zbx_db_commit();
		jobs_num = jobs.values_num;
	}
	else
	{
		zbx_db_rollback();
		jobs_num = 0;
	}

	report_destroy_params(&params);
	zbx_vector_ptr_destroy(&jobs);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s jobs:%d %s", __func__, zbx_result_string(ret), jobs_num,
			ZBX_NULL2EMPTY_STR(*error));

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: processes queue                                                   *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             now     - [IN]                                                 *
 *                                                                            *
 * Return value: The number of scheduled jobs.                                *
 *                                                                            *
 ******************************************************************************/
static int	rm_schedule_jobs(zbx_rm_t *manager, int now)
{
	zbx_rm_report_t		*report;
	zbx_binary_heap_elem_t	*elem;
	int			nextcheck, ret, jobs_num = 0;
	char			*error = NULL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() queue:%d", __func__, manager->report_queue.elems_num);

	while (SUCCEED != zbx_binary_heap_empty(&manager->report_queue))
	{
		elem = zbx_binary_heap_find_min(&manager->report_queue);
		report = (zbx_rm_report_t *)elem->data;
		if (now < report->nextcheck)
			break;

		zbx_binary_heap_remove_min(&manager->report_queue);
		report->nextcheck = 0;

		if (SUCCEED == (ret = rm_report_create_jobs(manager, report, now, &error)))
		{
			if (-1 != (nextcheck = rm_report_calc_nextcheck(report, now, &error)))
			{
				if (SUCCEED == rm_is_report_active(report, nextcheck))
				{
					zbx_binary_heap_elem_t	elem_new = {report->reportid, report};

					report->nextcheck = nextcheck;
					zbx_binary_heap_insert(&manager->report_queue, &elem_new);

					jobs_num++;
				}
			}
			else
				ret = FAIL;
		}

		if (FAIL == ret)
		{
			rm_update_report(manager, report, ZBX_REPORT_STATE_ERROR, error);

			zabbix_log(LOG_LEVEL_DEBUG, "Cannot process report: %s", error);
			zbx_free(error);
		}

	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() jobs:%d", __func__, jobs_num);

	return jobs_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: finishes job                                                      *
 *                                                                            *
 * Parameters: manager   - [IN]                                               *
 *             job       - [IN]                                               *
 *             status    - [IN] job status                                    *
 *             error     - [OUT]                                              *
 *             sent_num  - [IN]                                               *
 *             total_num - [IN]                                               *
 *                                                                            *
 ******************************************************************************/
static void	rm_finish_job(zbx_rm_t *manager, zbx_rm_job_t *job, int status, const char *error, int sent_num,
		int total_num)
{
	zbx_rm_batch_t	*batch;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (NULL == (batch = (zbx_rm_batch_t *)zbx_hashset_search(&manager->batches, &job->batchid)))
	{
		THIS_SHOULD_NEVER_HAPPEN;
		return;
	}

	if (SUCCEED != status)
	{
		size_t	offset = batch->info_offset;

		batch->error_num++;
		zbx_strcpy_alloc(&batch->info, &batch->info_alloc, &batch->info_offset, error);
		batch->info[offset] = toupper(batch->info[offset]);
		zbx_strcpy_alloc(&batch->info, &batch->info_alloc, &batch->info_offset, ".\n");
	}
	else
	{
		batch->sent_num += sent_num;
		batch->total_num += total_num;
	}

	for (int i = 0; i < batch->jobs.values_num; i++)
	{
		if (batch->jobs.values[i] == job)
		{
			rm_job_free(job);
			zbx_vector_ptr_remove_noorder(&batch->jobs, i);
			break;
		}
	}

	if (0 == batch->jobs.values_num)
	{
		zbx_rm_report_t	*report;

		zabbix_log(LOG_LEVEL_DEBUG, "%s() batch finished with %d failed jobs", __func__, batch->error_num);

		if (NULL != (report = (zbx_rm_report_t *)zbx_hashset_search(&manager->reports, &batch->reportid)))
		{
			char	*info = NULL;
			size_t	info_alloc = 0, info_offset = 0;

			status = ZBX_REPORT_STATE_SUCCESS;
			if (batch->sent_num != batch->total_num)
			{
				zbx_snprintf_alloc(&info, &info_alloc, &info_offset,
						"Failed to sent %d report(s) from %d.\n",
						batch->total_num - batch->sent_num, batch->total_num);
				status = ZBX_REPORT_STATE_SUCCESS_INFO;
			}

			if (0 != batch->error_num)
			{
				zbx_snprintf_alloc(&info, &info_alloc, &info_offset,
						"Failed to create %d report(s):\n%s",
						batch->error_num, batch->info);
				status = ZBX_REPORT_STATE_ERROR;
			}

			rm_update_report(manager, report, status, info);

			zbx_free(info);
		}

		rm_batch_clean(batch);
		zbx_hashset_remove_direct(&manager->batches, batch);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: sends error result in response to test request                    *
 *                                                                            *
 * Parameters: client - [IN] connected trapper                                *
 *             error  - [IN] error message                                    *
 *                                                                            *
 ******************************************************************************/
static void	rm_send_test_error_result(zbx_ipc_client_t *client, const char *error)
{
	unsigned char	*data;
	zbx_uint32_t	size;

	size = report_serialize_response(&data, FAIL, error, NULL);
	zbx_ipc_client_send(client, ZBX_IPC_REPORTER_TEST_RESULT, data, size);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: processes queue                                                   *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *                                                                            *
 * Return value: The number of started jobs.                                  *
 *                                                                            *
 ******************************************************************************/
static int	rm_process_jobs(zbx_rm_t *manager)
{
	zbx_rm_writer_t	*writer;
	zbx_rm_job_t	*job;
	char		*error = NULL;
	int		jobs_num = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	while (SUCCEED != zbx_queue_ptr_empty(&manager->free_writers))
	{
		if (SUCCEED != zbx_list_pop(&manager->job_queue, (void **)&job))
			break;

		writer = zbx_queue_ptr_pop(&manager->free_writers);

		if (SUCCEED != rm_writer_process_job(writer, job, &error))
		{
			if (NULL != job->client)
			{
				rm_send_test_error_result(job->client, error);
				rm_job_free(job);
			}
			else
				rm_finish_job(manager, job, FAIL, error, 0, 0);

			zbx_queue_ptr_push(&manager->free_writers, writer);
			zbx_free(error);
		}
		else
			writer->job = job;

		jobs_num++;
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() jobs:%d", __func__, jobs_num);

	return jobs_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: tests report                                                      *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             client  - [IN] connected writer                                *
 *             message - [IN] received message                                *
 *             error   - [IN] error message                                   *
 *                                                                            *
 * Return value: SUCCEED - test report job was created successfully           *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	rm_test_report(zbx_rm_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message, char **error)
{
	zbx_uint64_t		dashboardid, userid, access_userid;
	zbx_vector_ptr_pair_t	params;
	int			report_time, ret;
	unsigned char		period;
	zbx_rm_job_t		*job;
	char			*name;

	zbx_vector_ptr_pair_create(&params);

	report_deserialize_test_report(message->data, &name, &dashboardid, &userid, &access_userid, &report_time,
			&period, &params);

	for (int i = 0; i < params.values_num; i++)
	{
		if (0 == strcmp(params.values[i].first, ZBX_REPORT_PARAM_BODY) ||
				0 == strcmp(params.values[i].first, ZBX_REPORT_PARAM_SUBJECT))
		{
			zbx_substitute_macros((char **)&params.values[i].second, NULL, 0, macro_report_resolv, NULL);
		}
	}

	if (NULL != (job = rm_create_job(manager, name, dashboardid, access_userid, report_time, period, &userid, 1,
			&params, error)))
	{
		job->is_test_report = 1;
		zbx_ipc_client_addref(client);
		job->client = client;
		(void)zbx_list_append(&manager->job_queue, job, NULL);
		ret = SUCCEED;
	}
	else
		ret = FAIL;

	zbx_free(name);
	report_destroy_params(&params);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: processes report result message                                   *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             client  - [IN] connected writer                                *
 *             message - [IN] received message                                *
 *                                                                            *
 ******************************************************************************/
static void	rm_process_result(zbx_rm_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message)
{
	zbx_rm_writer_t	*writer;

	if (NULL == (writer = rm_get_writer(manager, client)))
	{
		THIS_SHOULD_NEVER_HAPPEN;
		return;
	}

	if (NULL != writer->job->client)
	{
		/* external test request - forward the response to the requester */
		if (SUCCEED == zbx_ipc_client_connected(writer->job->client))
		{
			zbx_ipc_client_send(writer->job->client, ZBX_IPC_REPORTER_TEST_RESULT, message->data,
					message->size);
		}
		rm_job_free(writer->job);
	}
	else
	{
		zbx_vector_alerter_dispatch_result_t	results;
		int					status, total_num = 0, sent_num = 0;
		zbx_alerter_dispatch_result_t		*result;
		char					*error;

		zbx_vector_alerter_dispatch_result_create(&results);

		report_deserialize_response(message->data, &status, &error, &results);

		for (int i = 0; i < results.values_num; i++)
		{
			result = results.values[i];

			if (SUCCEED == result->status)
			{
				sent_num++;
			}
			else
			{
				zabbix_log(LOG_LEVEL_DEBUG, "failed to send report to \"%s\": %s", result->recipient,
						result->info);
			}

			total_num++;
		}

		rm_finish_job(manager, writer->job, status, error, sent_num, total_num);
		zbx_free(error);

		zbx_vector_alerter_dispatch_result_clear_ext(&results, zbx_alerter_dispatch_result_free);
		zbx_vector_alerter_dispatch_result_destroy(&results);
	}

	writer->job = NULL;
	zbx_queue_ptr_push(&manager->free_writers, writer);
}

ZBX_THREAD_ENTRY(report_manager_thread, args)
{
#define	ZBX_STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
					/* once in STAT_INTERVAL seconds */
#define ZBX_SYNC_INTERVAL	60	/* report configuration refresh interval */
#define ZBX_FLUSH_INTERVAL	10

	char				*error = NULL;
	zbx_ipc_client_t		*client;
	zbx_ipc_message_t		*message;
	double				time_stat, time_idle = 0, time_now, sec, time_sync, time_flush;
	int				ret, processed_num = 0, created_num = 0,
					server_num = ((zbx_thread_args_t *)args)->info.server_num,
					process_num = ((zbx_thread_args_t *)args)->info.process_num;
	zbx_rm_t			manager;
	zbx_timespec_t			timeout;
	const zbx_thread_info_t		*info = &((zbx_thread_args_t *)args)->info;
	unsigned char			process_type = ((zbx_thread_args_t *)args)->info.process_type;
	zbx_thread_report_manager_args	*report_manager_args_in = (zbx_thread_report_manager_args *)
							(((zbx_thread_args_t *)args)->args);

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
			server_num, get_process_type_string(process_type), process_num);

	zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

	if (FAIL == rm_init(&manager, report_manager_args_in->get_process_forks_cb_arg, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot initialize alert manager: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	zbx_db_connect(ZBX_DB_CONNECT_NORMAL);

	/* initialize statistics */
	time_stat = zbx_time();
	time_sync = 0;
	time_flush = time_stat;

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();

		if (ZBX_STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [jobs created %d, processed %d, idle " ZBX_FS_DBL " sec during "
					ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num,
					created_num, processed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			created_num = 0;
			processed_num = 0;
		}

		if (ZBX_FLUSH_INTERVAL < time_now - time_flush)
		{
			rm_db_flush_reports(&manager);
			time_flush = time_now;
		}

		if (time_now - time_sync >= ZBX_SYNC_INTERVAL)
		{
			rm_update_cache(&manager);
			time_sync = time_now;
		}

		created_num += rm_schedule_jobs(&manager, (int)time(NULL));
		processed_num += rm_process_jobs(&manager);

		sec = zbx_time();

		if (sec - time_now >= 1)
		{
			timeout.sec = 0;
			timeout.ns = 0;
		}
		else
		{
			double	delay = 1 - (sec - time_now);

			timeout.sec = (int)delay;
			timeout.ns = (int)(delay * 1000000000) % 1000000000;
		}

		time_now = sec;

		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);
		ret = zbx_ipc_service_recv(&manager.ipc, &timeout, &client, &message);
		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

		sec = zbx_time();
		zbx_update_env(get_process_type_string(process_type), sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_REPORTER_REGISTER:
					rm_register_writer(&manager, client, message);
					break;
				case ZBX_IPC_REPORTER_TEST:
					if (FAIL == rm_test_report(&manager, client, message, &error))
					{
						rm_send_test_error_result(client, error);
						zbx_free(error);
					}
					break;
				case ZBX_IPC_REPORTER_RESULT:
					rm_process_result(&manager, client, message);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);
	}

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);
}
