Skip to content

Commit

Permalink
add tv monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
smartloli committed Aug 28, 2022
1 parent 63b58c6 commit 3dda491
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 490 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class Common {
private Common() {
}

public static final String EFAK_VERSION = "v3.0.0";
public static final String EFAK_VERSION = "v3.0.1";
public static final String EFAK_VERSION_DOC = "efakVersion";
}

Expand Down
8 changes: 1 addition & 7 deletions efak-common/src/main/resources/works
Original file line number Diff line number Diff line change
@@ -1,7 +1 @@
192.168.1.1
192.168.1.2
192.168.1.3
192.168.1.4
192.168.1.5
192.168.1.6
192.168.31.133
127.0.0.1
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
*/
package org.smartloli.kafka.eagle.core.task.shard;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.smartloli.kafka.eagle.common.constant.ThreadConstants;
import org.smartloli.kafka.eagle.common.util.NetUtils;
import org.smartloli.kafka.eagle.common.util.SystemConfigUtils;
import org.smartloli.kafka.eagle.common.util.WorkUtils;
import org.smartloli.kafka.eagle.core.factory.KafkaFactory;
import org.smartloli.kafka.eagle.core.factory.KafkaService;
import org.smartloli.kafka.eagle.core.task.strategy.WorkNodeStrategy;

import java.util.ArrayList;
Expand All @@ -42,59 +37,6 @@
*/
public class ScheduleShardStrategy {

private final static KafkaService kafkaService = new KafkaFactory().create();

@Deprecated
public static Map<String, String> getScheduleShardSuperTask(String cluster) {
Map<String, String> subShardMaps = new HashMap<>();
List<String> hosts = WorkUtils.getWorkNodes();
int port = SystemConfigUtils.getIntProperty("efak.worknode.port");
List<WorkNodeStrategy> nodes = new ArrayList<>();
for (String host : hosts) {
if (NetUtils.telnet(host, port)) {
WorkNodeStrategy wns = new WorkNodeStrategy();
wns.setPort(port);
wns.setHost(host);
String masterHost = SystemConfigUtils.getProperty("efak.worknode.master.host");
if (!masterHost.equals(host)) {
nodes.add(wns);
}
}
}

JSONArray consumerGroups = JSON.parseArray(kafkaService.getKafkaConsumer(cluster));

JSONArray consumerGroupShard = consumerGroups;
if (nodes.size() > 0 && consumerGroupShard != null) {
int balanceNum = (consumerGroupShard.size() / nodes.size()) + 1;
JSONArray tmpConsumerGroup = new JSONArray();
int consumerGroupIndex = 0;
int nodeIndex = 0;
for (Object consumerGroup : consumerGroupShard) {
consumerGroupIndex++;
JSONObject object = (JSONObject) consumerGroup;
if (tmpConsumerGroup.size() <= balanceNum) {
tmpConsumerGroup.add(object);
if (tmpConsumerGroup.size() == balanceNum) {
subShardMaps.put(nodes.get(nodeIndex).getHost(), tmpConsumerGroup.toString());
nodeIndex++;
tmpConsumerGroup.clear();
}

// final result dataset
if (consumerGroupIndex == consumerGroupShard.size()) {
subShardMaps.put(nodes.get(nodeIndex).getHost(), tmpConsumerGroup.toString());
nodeIndex = 0;
tmpConsumerGroup.clear();
}
}
}

}

return subShardMaps;
}

public static Map<String, List<String>> getScheduleShardTask() {
List<String> hosts = WorkUtils.getWorkNodes();
int port = SystemConfigUtils.getIntProperty("efak.worknode.port");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.smartloli.kafka.eagle.sysout;

import org.smartloli.kafka.eagle.core.task.shard.ScheduleShardStrategy;

import java.util.List;
import java.util.Map;

/**
* // NOTE
*
Expand All @@ -27,13 +32,10 @@
public class TestStrategyRange {

public static void main(String[] args) {
// if ((end - start) <= 20) {
// submit();
// } else {
// long middle = (start + end) / 2;
// }
System.out.println((0 + 30) / 2);

Map<String, List<String>> shardTasks = ScheduleShardStrategy.getScheduleShardTask();
for (Map.Entry<String, List<String>> entry : shardTasks.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.smartloli.kafka.eagle.common.util.KConstants;
import org.smartloli.kafka.eagle.common.util.KConstants.Topic;
import org.smartloli.kafka.eagle.web.service.BScreenService;
import org.smartloli.kafka.eagle.web.service.TVBScreenService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -46,6 +47,9 @@ public class BScreenController {
@Autowired
private BScreenService bscreen;

@Autowired
private TVBScreenService tvbScreenService;

/**
* Big screen viewer.
*/
Expand Down Expand Up @@ -86,6 +90,7 @@ public void getTopicTotalLogSizeAjax(HttpServletResponse response, HttpServletRe
}
}


/**
* Get producer history bar data by ajax.
*/
Expand Down Expand Up @@ -134,4 +139,34 @@ public void getTopicTotalCapacityAjax(HttpServletResponse response, HttpServletR
}
}

/**
* Get producer and consumer real rate data by ajax.
*/
@RequestMapping(value = "/get/tv/dashboard/mid/result/ajax", method = RequestMethod.GET)
public void getTVDashboardMidResultAjax(HttpServletResponse response, HttpServletRequest request) {
HttpSession session = request.getSession();
String clusterAlias = session.getAttribute(KConstants.SessionAlias.CLUSTER_ALIAS).toString();
try {
byte[] output = tvbScreenService.getClusterInfo(clusterAlias).getBytes();
BaseController.response(output, response);
} catch (Exception ex) {
ex.printStackTrace();
}
}

/**
* Get producer and consumer real rate data by ajax.
*/
@RequestMapping(value = "/get/tv/dashboard/mid/result/worknode/ajax", method = RequestMethod.GET)
public void getTVDashboardMidResultOfWorkNodeAjax(HttpServletResponse response, HttpServletRequest request) {
HttpSession session = request.getSession();
String clusterAlias = session.getAttribute(KConstants.SessionAlias.CLUSTER_ALIAS).toString();
try {
byte[] output = tvbScreenService.getClusterOfWorkNodeInfo(clusterAlias).getBytes();
BaseController.response(output, response);
} catch (Exception ex) {
ex.printStackTrace();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void getDashboardAreaChartAjax(HttpServletResponse response, HttpServletR
param.put("stime", CalendarUtils.getCustomLastDay(0));
param.put("etime", CalendarUtils.getCustomLastDay(0));
param.put("type", KConstants.CollectorType.KAFKA);
param.put("modules", Arrays.asList(KConstants.MBean.MESSAGEIN, KConstants.MBean.BYTEIN, KConstants.MBean.BYTEOUT, KConstants.MBean.OSFREEMEMORY));
param.put("modules", Arrays.asList(KConstants.MBean.MESSAGEIN, KConstants.MBean.BYTEIN, KConstants.MBean.BYTEOUT, KConstants.MBean.OSFREEMEMORY, KConstants.MBean.CPUUSED, KConstants.MBean.FAILEDFETCHREQUEST, KConstants.MBean.TOTALFETCHREQUESTSPERSEC, KConstants.MBean.TOTALPRODUCEREQUESTSPERSEC));
String target = metricsService.query(param);
if (StringUtils.isEmpty(target)) {
target = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public interface TVBScreenService {
/**
* Include cluster name, mem, capacity, logsize, groups .
*/
public String getTVMidOfKafka(String clusterAlias);
public String getClusterInfo(String clusterAlias);

public String getCpuAndMem(String clusterAlias);
public String getClusterOfWorkNodeInfo(String clusterAlias);

}
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ public JSONArray getTopicRank(Map<String, Object> params) {
if (i < topicRank.size()) {
object.put("id", index);
object.put("topic", "<a href='/topic/meta/" + topicRank.get(i).getTopic() + "/'>" + topicRank.get(i).getTopic() + "</a>");
object.put("topic_text", topicRank.get(i).getTopic());
object.put("capacity", StrUtils.stringify(topicRank.get(i).getTvalue()));
} else {
object.put("id", index);
object.put("topic", "");
object.put("topic_text", "");
object.put("capacity", "");
}
index++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public String query(Map<String, Object> params) throws ParseException {
JSONArray replicationBytesIns = new JSONArray();

JSONArray osFreeMems = new JSONArray();
JSONArray cpuUsed = new JSONArray();

JSONArray zkSendPackets = new JSONArray();
JSONArray zkReceivedPackets = new JSONArray();
Expand Down Expand Up @@ -313,6 +314,9 @@ public String query(Map<String, Object> params) throws ParseException {
case MBean.OSFREEMEMORY:
assembly(osFreeMems, kpi);
break;
case MBean.CPUUSED:
assembly(cpuUsed, kpi);
break;
default:
break;
}
Expand All @@ -334,6 +338,7 @@ public String query(Map<String, Object> params) throws ParseException {
target.put("replicationBytesIns", replicationBytesIns);
target.put("replicationBytesOuts", replicationBytesOuts);
target.put("osFreeMems", osFreeMems);
target.put("cpuUsed", cpuUsed);

return target.toJSONString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@
package org.smartloli.kafka.eagle.web.service.impl;

import com.alibaba.fastjson.JSONObject;
import org.smartloli.kafka.eagle.common.protocol.topic.TopicRank;
import org.smartloli.kafka.eagle.common.util.*;
import org.smartloli.kafka.eagle.core.factory.KafkaFactory;
import org.smartloli.kafka.eagle.core.factory.KafkaService;
import org.smartloli.kafka.eagle.core.factory.v2.BrokerFactory;
import org.smartloli.kafka.eagle.core.factory.v2.BrokerService;
import org.smartloli.kafka.eagle.core.task.schedule.JobClient;
import org.smartloli.kafka.eagle.web.dao.TopicDao;
import org.smartloli.kafka.eagle.web.service.TVBScreenService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
* EFAK TV Monitor.
Expand All @@ -29,26 +40,68 @@
* <p>
* Created by Aug 07, 2022
*/
@Service
public class TVBScreenServiceImpl implements TVBScreenService {

@Autowired
private TopicDao topicDao;

/**
* Kafka service interface.
*/
private KafkaService kafkaService = new KafkaFactory().create();
private static BrokerService brokerService = new BrokerFactory().create();

@Override
public String getTVMidOfKafka(String clusterAlias) {
public String getClusterInfo(String clusterAlias) {
JSONObject object = new JSONObject();

object.put("cluster", clusterAlias);
object.put("groups", kafkaService.getKafkaConsumerGroups(clusterAlias));
object.put("version", KConstants.Common.EFAK_VERSION);

Map<String, Object> params = new HashMap<>();
params.put("cluster", clusterAlias);
params.put("topics", brokerService.topicList(clusterAlias));
params.put("size", brokerService.topicList(clusterAlias).size());
params.put("tday", CalendarUtils.getCustomDate("yyyyMMdd"));
long totalRecords = topicDao.getBScreenTotalRecords(params);

if (SystemConfigUtils.getBooleanProperty("efak.distributed.enable")) {
object.put("mode", "Distribute");
} else {
object.put("mode", "Standalone");
}

Map<String, Object> producerParams = new HashMap<>();
producerParams.put("cluster", clusterAlias);
producerParams.put("topic", KConstants.Topic.PRODUCER_THREADS_KEY);
producerParams.put("tkey", KConstants.Topic.PRODUCER_THREADS);
TopicRank tr = topicDao.readProducerThreads(producerParams);
object.put("app", tr == null ? 0L : tr.getTvalue());
Map<String, Object> capacityParams = new HashMap<>();
capacityParams.put("cluster", clusterAlias);
capacityParams.put("tkey", KConstants.Topic.CAPACITY);
JSONObject capacity = StrUtils.stringifyByObject(topicDao.getTopicCapacity(capacityParams));
object.put("capacity", capacity.getString("size"));
object.put("capacityType", capacity.getString("type"));

return object.toJSONString();
return object.toString();
}

@Override
public String getCpuAndMem(String clusterAlias) {
return null;
public String getClusterOfWorkNodeInfo(String clusterAlias) {
JSONObject object = new JSONObject();
int worknode = 0;
try {
worknode = JobClient.getWorkNodeMetrics(clusterAlias).size();
} catch (Exception e) {
e.printStackTrace();
LoggerUtils.print(this.getClass()).error(e.toString());
}


object.put("worknode", worknode);
return object.toString();

}
}
4 changes: 2 additions & 2 deletions efak-web/src/main/webapp/WEB-INF/views/public/pro/navbar.jsp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
<ul>
<li><a href="/"><i class="bx bx-laptop"></i>Overview</a>
</li>
<%-- <li><a href="/tv"><i class="bx bx-desktop"></i>TV Dashboard</a>--%>
<%-- </li>--%>
<li><a href="/tv"><i class="bx bx-desktop"></i>TV Dashboard</a>
</li>
</ul>
</li>
<li class="menu-label">Message</li>
Expand Down
Loading

0 comments on commit 3dda491

Please sign in to comment.