当前位置:网站首页>keda 2.7.1 scaledJob 代码简要分析
keda 2.7.1 scaledJob 代码简要分析
2022-06-26 00:22:00 【weixin_40455124】
scaledJob 用途官方描述:As an alternate to scaling event-driven code as deployments you can also run and scale your code as Kubernetes Jobs.
本质是通过各种metric控制job 数量,核心代码在以下go文件
- scale_jobs.go
- scaledjob_controller.go
scaledjob_controller和scaledobject_controller类似,通过requestScaleLoop函数:
// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")
return r.scaleHandler.HandleScalableObject(ctx, scaledJob)
}
调用scale_handler.go的HandleScalableObject函数,HandleScalableObject同时处理了object和job。
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
case *kedav1alpha1.ScaledJob:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
}
return nil
scale_jobs的核心代码就是RequestJobScale和createJobs,最终通过batchv1 “k8s.io/api/batch/v1” 创建/调整job数量。
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
runningJobCount := e.getRunningJobCount(ctx, scaledJob)
pendingJobCount := e.getPendingJobCount(ctx, scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, scaledJob.MaxReplicaCount())
if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}
if isActive {
logger.V(1).Info("At least one scaler is active")
now := metav1.Now()
scaledJob.Status.LastActiveTime = &now
err := e.updateLastActiveTime(ctx, logger, scaledJob)
if err != nil {
logger.Error(err, "Failed to update last active time")
}
e.createJobs(ctx, logger, scaledJob, scaleTo, effectiveMaxScale)
} else {
logger.V(1).Info("No change in activity")
}
condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are active")
return
}
} else {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are not active")
return
}
}
}
err := e.cleanUp(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to cleanUp jobs")
}
}
func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-"
if scaledJob.Spec.JobTargetRef.Template.Labels == nil {
scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{
}
}
scaledJob.Spec.JobTargetRef.Template.Labels["scaledjob.keda.sh/name"] = scaledJob.GetName()
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
}
logger.Info("Creating jobs", "Number of jobs", scaleTo)
labels := map[string]string{
"app.kubernetes.io/name": scaledJob.GetName(),
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledJob.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledjob.keda.sh/name": scaledJob.GetName(),
}
for key, value := range scaledJob.ObjectMeta.Labels {
labels[key] = value
}
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
// we should set this property to allowed value in that case
if job.Spec.Template.Spec.RestartPolicy == "" {
logger.V(1).Info("Job RestartPolicy is not set, setting it to 'OnFailure', to avoid setting it to the client's default value 'Always'")
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}
// Set ScaledJob instance as the owner and controller
err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme)
if err != nil {
logger.Error(err, "Failed to set ScaledJob as the owner of the new Job")
}
err = e.client.Create(ctx, job)
if err != nil {
logger.Error(err, "Failed to create a new Job")
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}
其它有用URL:
https://livewyer.io/blog/2021/06/17/keda-showcase-autoscaling-based-on-prometheus-redis/
https://blog.csdn.net/github_19391267/article/details/109634935
边栏推荐
- PTA class a simulated 8th bomb: 1164-1167
- Textcnn paper Interpretation -- revolutionary neural networks for sense classification
- 大周建议自媒体博主前期做这4件事
- One stop solution EMQ for hundreds of millions of communication of Internet of things
- UN make (6) makefile的条件执行
- PTA class a simulated first bomb: 1132-1135
- The answer skills and examples of practical cases of the second construction company are full of essence
- CyCa children's physical etiquette Yueqing City training results assessment successfully concluded
- 论文阅读 Exploring Temporal Information for Dynamic Network Embedding
- Introduction to gun make (1)
猜你喜欢

分布式系统(二)分布式事务的理解

通俗易懂C語言關鍵字static

readv & writev

Sweet cool girl jinshuyi was invited to be the spokesperson for the global finals of the sixth season perfect children's model

About vs scanf, 'scanf' appears: this function or variable may be unsafe Solutions to the problem of consumer usi

Talking about interface test (2)

recvmsg & sendmsg

GNN (graph neural network) introduction vernacular

Android system startup security

tos cos dscp 区别和作用
随机推荐
Find the multiplication order of n
PTA class a simulated ninth bullet: 1114-1117
web测试
Focal loss
Easyexcel read file
Redis的使用
Easy to understand C language keyword static
Postman断言对应脚本的解释
What happens from entering a web address in the browser's input box to seeing the contents of the web page?
Pixel6 unlock bootloader
Abnova丨CMV CISH 探头解决方案
弹性蛋白酶的用途和化学性质
通俗易懂C語言關鍵字static
biggan:large scale gan training for high fidelity natural image synthesis
PTA class a simulated fifth bomb: 1148-1151
秀场精灵陈梓桐 受邀担任第六季完美童模全球总决赛首席体验官
求n的乘阶
Perfdog
浅谈接口测试(二)
Detailed explanation of WiFi related knowledge