当前位置:网站首页>Keda 2.7.1 brief analysis of scaledjob code
Keda 2.7.1 brief analysis of scaledjob code
2022-06-26 02:05:00 【weixin_ forty million four hundred and fifty-five thousand one 】
scaledJob Official description of use :As an alternate to scaling event-driven code as deployments you can also run and scale your code as Kubernetes Jobs.
The essence is through various metric control job Number , The core code is as follows go file
- scale_jobs.go
- scaledjob_controller.go
scaledjob_controller and scaledobject_controller similar , adopt requestScaleLoop function :
// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")
return r.scaleHandler.HandleScalableObject(ctx, scaledJob)
}
call scale_handler.go Of HandleScalableObject function ,HandleScalableObject At the same time object and job.
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
case *kedav1alpha1.ScaledJob:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
}
return nil
scale_jobs The core code is RequestJobScale and createJobs, Finally through batchv1 “k8s.io/api/batch/v1” establish / adjustment job Number .
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
runningJobCount := e.getRunningJobCount(ctx, scaledJob)
pendingJobCount := e.getPendingJobCount(ctx, scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, scaledJob.MaxReplicaCount())
if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}
if isActive {
logger.V(1).Info("At least one scaler is active")
now := metav1.Now()
scaledJob.Status.LastActiveTime = &now
err := e.updateLastActiveTime(ctx, logger, scaledJob)
if err != nil {
logger.Error(err, "Failed to update last active time")
}
e.createJobs(ctx, logger, scaledJob, scaleTo, effectiveMaxScale)
} else {
logger.V(1).Info("No change in activity")
}
condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are active")
return
}
} else {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are not active")
return
}
}
}
err := e.cleanUp(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to cleanUp jobs")
}
}
func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-"
if scaledJob.Spec.JobTargetRef.Template.Labels == nil {
scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{
}
}
scaledJob.Spec.JobTargetRef.Template.Labels["scaledjob.keda.sh/name"] = scaledJob.GetName()
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
}
logger.Info("Creating jobs", "Number of jobs", scaleTo)
labels := map[string]string{
"app.kubernetes.io/name": scaledJob.GetName(),
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledJob.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledjob.keda.sh/name": scaledJob.GetName(),
}
for key, value := range scaledJob.ObjectMeta.Labels {
labels[key] = value
}
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
// we should set this property to allowed value in that case
if job.Spec.Template.Spec.RestartPolicy == "" {
logger.V(1).Info("Job RestartPolicy is not set, setting it to 'OnFailure', to avoid setting it to the client's default value 'Always'")
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}
// Set ScaledJob instance as the owner and controller
err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme)
if err != nil {
logger.Error(err, "Failed to set ScaledJob as the owner of the new Job")
}
err = e.client.Create(ctx, job)
if err != nil {
logger.Error(err, "Failed to create a new Job")
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}
Other useful URL:
https://livewyer.io/blog/2021/06/17/keda-showcase-autoscaling-based-on-prometheus-redis/
https://blog.csdn.net/github_19391267/article/details/109634935
边栏推荐
- 如何使用命令将文件夹中的文件名(包括路径)写入到txt文件中
- 求n乘阶之和
- 安装了Visual Studio 2013 Redistributable,mysql还是安装失败
- Input 3 integers and output them from large to small
- readv & writev
- Abnova anti GBA monoclonal antibody solution
- V4L2+QT视频优化策略
- FPGA实现图像二值形态学滤波——腐蚀膨胀
- Sweet girl lisixia was invited to be the little host of the global finals of the sixth season perfect child model
- 静态库动态库的使用
猜你喜欢

前置++,后置++与前置--与后置--(++a,a++与--a,a--)

通俗易懂C语言关键字static

Playful girl wangyixuan was invited to serve as the Promotion Ambassador for the global finals of the sixth season perfect children's model

Sweet cool girl jinshuyi was invited to be the spokesperson for the global finals of the sixth season perfect children's model

Energetic girl wangyujie was invited to be the spokesperson for the global finals of the sixth season perfect children's model

A lost note for konjaku beginner

Exploring temporary information for dynamic network embedding

recvmsg & sendmsg

图形渲染管线

biggan:large scale gan training for high fidelity natural image synthesis
随机推荐
Dataframe extracts data from a column and converts it into a list
Getting to know OpenGL
Energetic girl wangyujie was invited to be the spokesperson for the global finals of the sixth season perfect children's model
Use of static library and dynamic library
readv & writev
Disruptor (I) sequence
宁要一个完成,不要千万个开始(转载自豆瓣)
Find the multiplication order of n
Ndk20b ffmpeg4.2.2 compilation and integration
buffer
Prompt to update to the latest debug version during vscode debugging
shell curl 执行脚本,带传参数,自定义参数
如何制定一个可实现的年度目标?
How to use commands to write file names (including paths) in folders to txt files
Characteristics and related specificity of Papain
求n的乘阶
Chinese and English instructions of collagen enzyme Worthington
The role of xargs
Introduction to gun make (1)
Multi type study of Worthington collagen protease